Skip to content
Snippets Groups Projects
Commit 03ae0795 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/#25' into 'main'

#25 Allow channels to become unavailable

See merge request beyondaka/beyond-protocol!57
parents 48fe3b66 75743c18
No related branches found
No related tags found
No related merge requests found
...@@ -21,6 +21,8 @@ using ChannelSet = std::unordered_set<ftl::protocol::Channel>; ...@@ -21,6 +21,8 @@ using ChannelSet = std::unordered_set<ftl::protocol::Channel>;
ftl::protocol::ChannelSet operator&(const ftl::protocol::ChannelSet &a, const ftl::protocol::ChannelSet &b); ftl::protocol::ChannelSet operator&(const ftl::protocol::ChannelSet &a, const ftl::protocol::ChannelSet &b);
ftl::protocol::ChannelSet operator|(const ftl::protocol::ChannelSet &a, const ftl::protocol::ChannelSet &b);
ftl::protocol::ChannelSet operator-(const ftl::protocol::ChannelSet &a, const ftl::protocol::ChannelSet &b); ftl::protocol::ChannelSet operator-(const ftl::protocol::ChannelSet &a, const ftl::protocol::ChannelSet &b);
inline ftl::protocol::ChannelSet &operator+=(ftl::protocol::ChannelSet &t, ftl::protocol::Channel c) { inline ftl::protocol::ChannelSet &operator+=(ftl::protocol::ChannelSet &t, ftl::protocol::Channel c) {
......
...@@ -12,9 +12,10 @@ ...@@ -12,9 +12,10 @@
namespace ftl { namespace ftl {
namespace protocol { namespace protocol {
inline bool isVideo(Channel c) { return static_cast<int>(c) < 32; } constexpr bool isVideo(Channel c) { return static_cast<int>(c) < 32; }
inline bool isAudio(Channel c) { return static_cast<int>(c) >= 32 && static_cast<int>(c) < 64; } constexpr bool isAudio(Channel c) { return static_cast<int>(c) >= 32 && static_cast<int>(c) < 64; }
inline bool isData(Channel c) { return static_cast<int>(c) >= 64; } constexpr bool isData(Channel c) { return static_cast<int>(c) >= 64; }
constexpr bool isPersistent(Channel c) { return static_cast<int>(c) >= 64; }
/** Obtain a string name for channel. */ /** Obtain a string name for channel. */
std::string name(Channel c); std::string name(Channel c);
...@@ -31,7 +32,7 @@ Channel fromName(const std::string &name); ...@@ -31,7 +32,7 @@ Channel fromName(const std::string &name);
int type(Channel c); int type(Channel c);
/** @deprecated */ /** @deprecated */
inline bool isFloatChannel(ftl::protocol::Channel chan) { constexpr bool isFloatChannel(ftl::protocol::Channel chan) {
switch (chan) { switch (chan) {
case Channel::kGroundTruth : case Channel::kGroundTruth :
case Channel::kDepth : case Channel::kDepth :
......
...@@ -382,7 +382,9 @@ class Stream { ...@@ -382,7 +382,9 @@ class Stream {
struct FSState { struct FSState {
bool enabled = false; bool enabled = false;
ftl::protocol::ChannelSet selected; ftl::protocol::ChannelSet selected;
ftl::protocol::ChannelSet available; ftl::protocol::ChannelSet availablePersistent;
std::atomic_uint64_t availableLast = 0;
std::atomic_uint64_t availableNext = 0;
// TODO(Nick): Add a name and metadata // TODO(Nick): Add a name and metadata
}; };
...@@ -390,7 +392,10 @@ class Stream { ...@@ -390,7 +392,10 @@ class Stream {
ftl::Handler<const Request &> request_cb_; ftl::Handler<const Request &> request_cb_;
ftl::Handler<FrameID, ftl::protocol::Channel> avail_cb_; ftl::Handler<FrameID, ftl::protocol::Channel> avail_cb_;
ftl::Handler<ftl::protocol::Error, const std::string&> error_cb_; ftl::Handler<ftl::protocol::Error, const std::string&> error_cb_;
std::unordered_map<int, FSState> state_; std::unordered_map<int, std::shared_ptr<FSState>> state_;
std::shared_ptr<FSState> _getState(FrameID id);
std::shared_ptr<FSState> _getState(FrameID id) const;
}; };
using StreamPtr = std::shared_ptr<Stream>; using StreamPtr = std::shared_ptr<Stream>;
......
...@@ -16,6 +16,13 @@ ChannelSet operator&(const ChannelSet &a, const ChannelSet &b) { ...@@ -16,6 +16,13 @@ ChannelSet operator&(const ChannelSet &a, const ChannelSet &b) {
return result; return result;
} }
ChannelSet operator|(const ChannelSet &a, const ChannelSet &b) {
ChannelSet result;
result.insert(a.begin(), a.end());
result.insert(b.begin(), b.end());
return result;
}
ChannelSet operator-(const ChannelSet &a, const ChannelSet &b) { ChannelSet operator-(const ChannelSet &a, const ChannelSet &b) {
ChannelSet result; ChannelSet result;
for (auto &i : a) { for (auto &i : a) {
......
...@@ -5,11 +5,14 @@ ...@@ -5,11 +5,14 @@
*/ */
#include <ftl/protocol/streams.hpp> #include <ftl/protocol/streams.hpp>
#include <ftl/protocol/channelUtils.hpp>
#include <ftl/protocol/channelSet.hpp>
using ftl::protocol::Stream; using ftl::protocol::Stream;
using ftl::protocol::Channel; using ftl::protocol::Channel;
using ftl::protocol::ChannelSet; using ftl::protocol::ChannelSet;
using ftl::protocol::FrameID; using ftl::protocol::FrameID;
using ftl::protocol::isPersistent;
std::string Stream::name() const { std::string Stream::name() const {
return "Unknown"; return "Unknown";
...@@ -21,34 +24,46 @@ bool Stream::available(FrameID id) const { ...@@ -21,34 +24,46 @@ bool Stream::available(FrameID id) const {
} }
bool Stream::available(FrameID id, Channel channel) const { bool Stream::available(FrameID id, Channel channel) const {
auto state = _getState(id);
if (!state) return false;
if (isPersistent(channel)) {
SHARED_LOCK(mtx_, lk); SHARED_LOCK(mtx_, lk);
auto it = state_.find(id); return state->availablePersistent.count(channel) > 0;
if (it != state_.end()) { } else {
return it->second.available.count(channel) > 0; return state->availableLast & (1ull << static_cast<int>(channel));
} }
return false;
} }
bool Stream::available(FrameID id, const ChannelSet &channels) const { bool Stream::available(FrameID id, const ChannelSet &channels) const {
SHARED_LOCK(mtx_, lk); auto state = _getState(id);
auto it = state_.find(id); if (!state) return false;
if (it != state_.end()) {
const auto &set = it->second.available;
for (auto channel : channels) { for (auto channel : channels) {
if (set.count(channel) == 0) return false; if (isPersistent(channel)) {
SHARED_LOCK(mtx_, lk);
if (state->availablePersistent.count(channel) == 0) return false;
} else {
if ((state->availableLast & (1ull << static_cast<int>(channel))) == 0) return false;
} }
return true;
} }
return false; return true;
} }
ftl::protocol::ChannelSet Stream::channels(FrameID id) const { ftl::protocol::ChannelSet Stream::channels(FrameID id) const {
auto state = _getState(id);
if (!state) return {};
SHARED_LOCK(mtx_, lk); SHARED_LOCK(mtx_, lk);
auto it = state_.find(id); ChannelSet result = state->availablePersistent;
if (it != state_.end()) { lk.unlock();
return it->second.available;
uint64_t last = state->availableLast;
for (int i = 0; i < 64; ++i) {
if ((1ull << i) & last) {
result.insert(static_cast<Channel>(i));
} }
return {}; }
return result;
} }
std::unordered_set<FrameID> Stream::frames() const { std::unordered_set<FrameID> Stream::frames() const {
...@@ -64,7 +79,8 @@ std::unordered_set<FrameID> Stream::enabled() const { ...@@ -64,7 +79,8 @@ std::unordered_set<FrameID> Stream::enabled() const {
SHARED_LOCK(mtx_, lk); SHARED_LOCK(mtx_, lk);
std::unordered_set<FrameID> result; std::unordered_set<FrameID> result;
for (const auto &s : state_) { for (const auto &s : state_) {
if (s.second.enabled) { if (!s.second) continue;
if (s.second->enabled) {
result.emplace(s.first); result.emplace(s.first);
} }
} }
...@@ -72,19 +88,17 @@ std::unordered_set<FrameID> Stream::enabled() const { ...@@ -72,19 +88,17 @@ std::unordered_set<FrameID> Stream::enabled() const {
} }
bool Stream::enabled(FrameID id) const { bool Stream::enabled(FrameID id) const {
SHARED_LOCK(mtx_, lk); auto state = _getState(id);
auto it = state_.find(id); if (!state) return false;
if (it != state_.end()) { return state->enabled;
return it->second.enabled;
}
return false;
} }
bool Stream::enabled(FrameID id, ftl::protocol::Channel channel) const { bool Stream::enabled(FrameID id, ftl::protocol::Channel channel) const {
SHARED_LOCK(mtx_, lk); SHARED_LOCK(mtx_, lk);
auto it = state_.find(id); auto it = state_.find(id);
if (it != state_.end()) { if (it != state_.end()) {
return it->second.selected.count(channel) > 0; if (!it->second) return false;
return it->second->selected.count(channel) > 0;
} }
return false; return false;
} }
...@@ -93,7 +107,8 @@ ftl::protocol::ChannelSet Stream::enabledChannels(FrameID id) const { ...@@ -93,7 +107,8 @@ ftl::protocol::ChannelSet Stream::enabledChannels(FrameID id) const {
SHARED_LOCK(mtx_, lk); SHARED_LOCK(mtx_, lk);
auto it = state_.find(id); auto it = state_.find(id);
if (it != state_.end()) { if (it != state_.end()) {
return it->second.selected; if (!it->second) return {};
return it->second->selected;
} }
return {}; return {};
} }
...@@ -101,49 +116,55 @@ ftl::protocol::ChannelSet Stream::enabledChannels(FrameID id) const { ...@@ -101,49 +116,55 @@ ftl::protocol::ChannelSet Stream::enabledChannels(FrameID id) const {
bool Stream::enable(FrameID id) { bool Stream::enable(FrameID id) {
UNIQUE_LOCK(mtx_, lk); UNIQUE_LOCK(mtx_, lk);
auto &p = state_[id]; auto &p = state_[id];
p.enabled = true; if (!p) p = std::make_shared<Stream::FSState>();
p->enabled = true;
return true; return true;
} }
bool Stream::enable(FrameID id, ftl::protocol::Channel channel) { bool Stream::enable(FrameID id, ftl::protocol::Channel channel) {
UNIQUE_LOCK(mtx_, lk); UNIQUE_LOCK(mtx_, lk);
auto &p = state_[id]; auto &p = state_[id];
p.enabled = true; if (!p) p = std::make_shared<Stream::FSState>();
p.selected.insert(channel); p->enabled = true;
p->selected.insert(channel);
return true; return true;
} }
bool Stream::enable(FrameID id, const ftl::protocol::ChannelSet &channels) { bool Stream::enable(FrameID id, const ftl::protocol::ChannelSet &channels) {
UNIQUE_LOCK(mtx_, lk); UNIQUE_LOCK(mtx_, lk);
auto &p = state_[id]; auto &p = state_[id];
p.enabled = true; if (!p) p = std::make_shared<Stream::FSState>();
p.selected.insert(channels.begin(), channels.end()); p->enabled = true;
p->selected.insert(channels.begin(), channels.end());
return true; return true;
} }
void Stream::disable(FrameID id) { void Stream::disable(FrameID id) {
UNIQUE_LOCK(mtx_, lk); UNIQUE_LOCK(mtx_, lk);
auto &p = state_[id]; auto &p = state_[id];
p.enabled = false; if (!p) p = std::make_shared<Stream::FSState>();
p->enabled = false;
} }
void Stream::disable(FrameID id, ftl::protocol::Channel channel) { void Stream::disable(FrameID id, ftl::protocol::Channel channel) {
UNIQUE_LOCK(mtx_, lk); UNIQUE_LOCK(mtx_, lk);
auto &p = state_[id]; auto &p = state_[id];
p.selected.erase(channel); if (!p) p = std::make_shared<Stream::FSState>();
if (p.selected.size() == 0) { p->selected.erase(channel);
p.enabled = false; if (p->selected.size() == 0) {
p->enabled = false;
} }
} }
void Stream::disable(FrameID id, const ftl::protocol::ChannelSet &channels) { void Stream::disable(FrameID id, const ftl::protocol::ChannelSet &channels) {
UNIQUE_LOCK(mtx_, lk); UNIQUE_LOCK(mtx_, lk);
auto &p = state_[id]; auto &p = state_[id];
if (!p) p = std::make_shared<Stream::FSState>();
for (const auto &c : channels) { for (const auto &c : channels) {
p.selected.erase(c); p->selected.erase(c);
} }
if (p.selected.size() == 0) { if (p->selected.size() == 0) {
p.enabled = false; p->enabled = false;
} }
} }
...@@ -158,16 +179,43 @@ void Stream::trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protoco ...@@ -158,16 +179,43 @@ void Stream::trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protoco
cb_.trigger(spkt, pkt); cb_.trigger(spkt, pkt);
} }
std::shared_ptr<Stream::FSState> Stream::_getState(FrameID id) {
{
SHARED_LOCK(mtx_, lk);
auto it = state_.find(id);
if (it != state_.end()) return it->second;
}
UNIQUE_LOCK(mtx_, lk);
if (!state_[id]) state_[id] = std::make_shared<Stream::FSState>();
return state_[id];
}
std::shared_ptr<Stream::FSState> Stream::_getState(FrameID id) const {
SHARED_LOCK(mtx_, lk);
auto it = state_.find(id);
if (it != state_.end()) return it->second;
return nullptr;
}
void Stream::seen(FrameID id, ftl::protocol::Channel channel) { void Stream::seen(FrameID id, ftl::protocol::Channel channel) {
if (!available(id, channel)) { auto state = _getState(id);
if (channel == Channel::kEndFrame) {
state->availableLast = static_cast<uint64_t>(state->availableNext);
state->availableNext = 0;
} else if (isPersistent(channel)) {
{ {
SHARED_LOCK(mtx_, lk);
if (state->availablePersistent.count(channel) > 0) return;
}
UNIQUE_LOCK(mtx_, lk); UNIQUE_LOCK(mtx_, lk);
auto &p = state_[id]; state->availablePersistent.insert(channel);
p.available.insert(channel); } else {
state->availableNext |= 1ull << static_cast<int>(channel);
if (state->availableLast & (1ull << static_cast<int>(channel))) return;
} }
avail_cb_.trigger(id, channel); avail_cb_.trigger(id, channel);
} }
}
void Stream::request(const ftl::protocol::Request &req) { void Stream::request(const ftl::protocol::Request &req) {
request_cb_.trigger(req); request_cb_.trigger(req);
......
...@@ -478,21 +478,34 @@ TEST_CASE("Muxer available", "[stream]") { ...@@ -478,21 +478,34 @@ TEST_CASE("Muxer available", "[stream]") {
REQUIRE( mux->available(id2) ); REQUIRE( mux->available(id2) );
} }
SECTION("available channel when seen") { SECTION("available (persistent) channel when seen") {
FrameID id1(0, 1);
REQUIRE( !s1->available(id1, Channel::kPose) );
REQUIRE( !mux->available(id1, Channel::kPose) );
s1->forceSeen(id1, Channel::kPose);
REQUIRE( s1->available(id1, Channel::kPose) );
REQUIRE( mux->available(id1, Channel::kPose) );
}
SECTION("available (temp) channel when seen") {
FrameID id1(0, 1); FrameID id1(0, 1);
REQUIRE( !s1->available(id1, Channel::kColour) ); REQUIRE( !s1->available(id1, Channel::kColour) );
REQUIRE( !mux->available(id1, Channel::kColour) ); REQUIRE( !mux->available(id1, Channel::kColour) );
s1->forceSeen(id1, Channel::kColour); s1->forceSeen(id1, Channel::kColour);
s1->forceSeen(id1, Channel::kEndFrame);
REQUIRE( s1->available(id1, Channel::kColour) ); REQUIRE( s1->available(id1, Channel::kColour) );
REQUIRE( mux->available(id1, Channel::kColour) ); REQUIRE( mux->available(id1, Channel::kColour) );
s1->forceSeen(id1, Channel::kEndFrame);
REQUIRE( !s1->available(id1, Channel::kColour) );
REQUIRE( !mux->available(id1, Channel::kColour) );
} }
SECTION("not available when wrong channel seen") { SECTION("not available when wrong channel seen") {
FrameID id1(0, 1); FrameID id1(0, 1);
s1->forceSeen(id1, Channel::kDepth); s1->forceSeen(id1, Channel::kCalibration2);
REQUIRE( mux->available(id1) ); REQUIRE( mux->available(id1) );
REQUIRE( !s1->available(id1, Channel::kColour) ); REQUIRE( !s1->available(id1, Channel::kPose) );
REQUIRE( !mux->available(id1, Channel::kColour) ); REQUIRE( !mux->available(id1, Channel::kPose) );
} }
SECTION("available channel set when seen all") { SECTION("available channel set when seen all") {
...@@ -502,6 +515,7 @@ TEST_CASE("Muxer available", "[stream]") { ...@@ -502,6 +515,7 @@ TEST_CASE("Muxer available", "[stream]") {
REQUIRE( !mux->available(id1, set) ); REQUIRE( !mux->available(id1, set) );
s1->forceSeen(id1, Channel::kColour); s1->forceSeen(id1, Channel::kColour);
s1->forceSeen(id1, Channel::kDepth); s1->forceSeen(id1, Channel::kDepth);
s1->forceSeen(id1, Channel::kEndFrame);
REQUIRE( s1->available(id1, set) ); REQUIRE( s1->available(id1, set) );
REQUIRE( mux->available(id1, set) ); REQUIRE( mux->available(id1, set) );
} }
...@@ -512,6 +526,7 @@ TEST_CASE("Muxer available", "[stream]") { ...@@ -512,6 +526,7 @@ TEST_CASE("Muxer available", "[stream]") {
REQUIRE( !s1->available(id1, set) ); REQUIRE( !s1->available(id1, set) );
REQUIRE( !mux->available(id1, set) ); REQUIRE( !mux->available(id1, set) );
s1->forceSeen(id1, Channel::kDepth); s1->forceSeen(id1, Channel::kDepth);
s1->forceSeen(id1, Channel::kEndFrame);
REQUIRE( !s1->available(id1, set) ); REQUIRE( !s1->available(id1, set) );
REQUIRE( !mux->available(id1, set) ); REQUIRE( !mux->available(id1, set) );
} }
...@@ -625,6 +640,7 @@ TEST_CASE("Muxer channels", "[stream]") { ...@@ -625,6 +640,7 @@ TEST_CASE("Muxer channels", "[stream]") {
s1->forceSeen(id1, Channel::kColour); s1->forceSeen(id1, Channel::kColour);
s1->forceSeen(id1, Channel::kDepth); s1->forceSeen(id1, Channel::kDepth);
s1->forceSeen(id1, Channel::kEndFrame);
auto set = mux->channels(id1); auto set = mux->channels(id1);
REQUIRE( set.size() == 2 ); REQUIRE( set.size() == 2 );
...@@ -656,9 +672,11 @@ TEST_CASE("Muxer enabledChannels", "[stream]") { ...@@ -656,9 +672,11 @@ TEST_CASE("Muxer enabledChannels", "[stream]") {
s1->forceSeen(id1, Channel::kColour); s1->forceSeen(id1, Channel::kColour);
s1->forceSeen(id1, Channel::kDepth); s1->forceSeen(id1, Channel::kDepth);
s1->forceSeen(id1, Channel::kPose);
s1->forceSeen(id1, Channel::kEndFrame);
auto set = mux->channels(id1); auto set = mux->channels(id1);
REQUIRE( set.size() == 2 ); REQUIRE( set.size() == 3 );
REQUIRE( mux->enable(id1, set) ); REQUIRE( mux->enable(id1, set) );
......
...@@ -201,6 +201,9 @@ TEST_CASE("Net stream can see received data") { ...@@ -201,6 +201,9 @@ TEST_CASE("Net stream can see received data") {
spkt.channel = Channel::kColour; spkt.channel = Channel::kColour;
writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt));
p->data(); p->data();
spkt.channel = Channel::kEndFrame;
writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt));
p->data();
sleep_for(milliseconds(50)); sleep_for(milliseconds(50));
REQUIRE( seenReq ); REQUIRE( seenReq );
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment