diff --git a/include/ftl/protocol/error.hpp b/include/ftl/protocol/error.hpp index 52575e4f845e266fb268c130e453f77eac45fc9b..97f5f2bb23f566c70417b90879db94758e4fb421 100644 --- a/include/ftl/protocol/error.hpp +++ b/include/ftl/protocol/error.hpp @@ -16,7 +16,9 @@ enum struct Error { kBadHandshake, kConnectionFailed, kSelfConnect, - kListen + kListen, + kURIAlreadyExists, + kURIDoesNotExist }; } diff --git a/include/ftl/protocol/muxer.hpp b/include/ftl/protocol/muxer.hpp index 44041ad71c0a0b4caae32d7dcd045db7f75213c8..f3d85a0e449c2194307e1305691d9545c3569ad4 100644 --- a/include/ftl/protocol/muxer.hpp +++ b/include/ftl/protocol/muxer.hpp @@ -56,6 +56,7 @@ class Muxer : public Stream { ftl::Handle handle; ftl::Handle req_handle; ftl::Handle avail_handle; + ftl::Handle err_handle; int id = 0; int fixed_fs = -1; }; @@ -64,12 +65,7 @@ class Muxer : public Stream { std::unordered_map<int, int> sourcecount_; std::unordered_map<int64_t, FrameID> imap_; std::unordered_map<FrameID, std::pair<FrameID, Muxer::StreamEntry*>> omap_; - std::list<StreamEntry> streams_; - //std::vector<std::pair<StreamEntry*,int>> revmap_[kMaxStreams]; - //std::list<ftl::Handle> handles_; - //int nid_[kMaxStreams]; - //StreamCallback cb_; mutable SHARED_MUTEX mutex_; std::atomic_int stream_ids_ = 0; std::atomic_int framesets_ = 0; @@ -79,10 +75,6 @@ class Muxer : public Stream { /* On posting, map to output ID */ std::pair<FrameID, StreamEntry*> _mapToOutput(FrameID id) const; - - //void _notify(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt); - //int _lookup(size_t fsid, StreamEntry *se, int ssid, int count); - //void _forward(const std::string &name); }; } diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp index 5f24b61504eb46d4d8f96ceb2c154c2f2fe1c758..2de2eec79cd1d98904e6f479590b886a5043506d 100644 --- a/include/ftl/protocol/streams.hpp +++ b/include/ftl/protocol/streams.hpp @@ -12,13 +12,13 @@ #include <ftl/protocol/channelSet.hpp> #include <ftl/protocol/packet.hpp> #include <ftl/protocol/frameid.hpp> +#include <ftl/protocol/error.hpp> #include <string> #include <vector> #include <unordered_set> namespace ftl { namespace protocol { - /* Represents a request for data through a stream */ struct Request { FrameID id; @@ -181,6 +181,8 @@ class Stream { virtual StreamType type() const { return StreamType::kUnknown; } + ftl::Handle onError(const std::function<bool(ftl::protocol::Error, const std::string &)> &cb) { return error_cb_.on(cb); } + protected: void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt); @@ -188,6 +190,8 @@ class Stream { void request(const Request &req); + void error(ftl::protocol::Error, const std::string &str); + mutable SHARED_MUTEX mtx_; private: @@ -201,8 +205,11 @@ class Stream { ftl::Handler<const ftl::protocol::StreamPacket&, const ftl::protocol::Packet&> cb_; ftl::Handler<const Request &> request_cb_; ftl::Handler<FrameID, ftl::protocol::Channel> avail_cb_; + ftl::Handler<ftl::protocol::Error, const std::string&> error_cb_; std::unordered_map<int, FSState> state_; }; +using StreamPtr = std::shared_ptr<Stream>; + } } diff --git a/src/streams/muxer.cpp b/src/streams/muxer.cpp index 6a84a9902da9087e153ff7865811adc32cf83080..3e7f592a9b231ae8ed69b1de180da0bbb87182f0 100644 --- a/src/streams/muxer.cpp +++ b/src/streams/muxer.cpp @@ -95,6 +95,11 @@ void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) { request(newRequest); return true; })); + + se.err_handle = std::move(s->onError([this](ftl::protocol::Error err, const std::string &str) { + error(err, str); + return true; + })); } void Muxer::remove(const std::shared_ptr<Stream> &s) { diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index 7cf263c7f11adea62d062665b7d4d00764ba2e8f..7292328bc7855b1506646b978ee76ba0c1f797c8 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -20,6 +20,7 @@ using ftl::protocol::Packet; using ftl::protocol::Channel; using ftl::protocol::Codec; using ftl::protocol::FrameID; +using ftl::protocol::Error; using ftl::protocol::kAllFrames; using ftl::protocol::kAllFramesets; using std::string; @@ -174,6 +175,7 @@ bool Net::post(const StreamPacket &spkt, const Packet &pkt) { if (pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp); } catch(...) { // TODO: Some disconnect error + return false; } } } @@ -204,7 +206,7 @@ bool Net::begin() { base_uri_ = u.getBaseURI(); if (net_->isBound(base_uri_)) { - LOG(ERROR) << "Stream already exists! - " << uri_; + error(Error::kURIAlreadyExists, std::string("Stream already exists: ") + uri_); active_ = false; return false; } @@ -373,7 +375,7 @@ bool Net::_enable(FrameID id) { if (ws) { peer_ = ws->id(); } else { - LOG(ERROR) << "Stream Peer not found"; + error(Error::kURIDoesNotExist, std::string("Stream not found: ") + uri_); return false; } } @@ -448,7 +450,7 @@ void Net::_cleanUp() { if (client.peerid == time_peer_) { time_peer_ = ftl::UUID(0); } - LOG(INFO) << "Remove peer: " << client.peerid.to_string(); + DLOG(INFO) << "Remove peer: " << client.peerid.to_string(); i = clients_.erase(i); } } diff --git a/src/streams/streams.cpp b/src/streams/streams.cpp index 52967498822b371470889427c9e5ab05286b4e1a..bf12a13c415526135d9a886d815d7ee1687ff3b2 100644 --- a/src/streams/streams.cpp +++ b/src/streams/streams.cpp @@ -142,3 +142,7 @@ void Stream::seen(FrameID id, ftl::protocol::Channel channel) { void Stream::request(const ftl::protocol::Request &req) { request_cb_.trigger(req); } + +void Stream::error(ftl::protocol::Error err, const std::string &str) { + error_cb_.trigger(err, str); +} diff --git a/test/muxer_unit.cpp b/test/muxer_unit.cpp index 03d2991fb854079fb8a2f00f9e3847c3687dbacd..bac6374cff620be068102a28671ad8995d5ff483 100644 --- a/test/muxer_unit.cpp +++ b/test/muxer_unit.cpp @@ -38,6 +38,10 @@ class TestStream : public ftl::protocol::Stream { void forceSeen(FrameID id, Channel channel) { seen(id, channel); } + + void fakeError(ftl::protocol::Error err, const std::string &str) { + error(err, str); + } }; TEST_CASE("Muxer post, distinct framesets", "[stream]") { @@ -570,3 +574,26 @@ TEST_CASE("Muxer enabledChannels", "[stream]") { REQUIRE( set.size() == 0 ); } } + +TEST_CASE("Muxer onError", "[stream]") { + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + ftl::protocol::Error seenErr = ftl::protocol::Error::kNoError; + auto h = mux->onError([&seenErr](ftl::protocol::Error err, const std::string &str) { + seenErr = err; + return true; + }); + + s1->fakeError(ftl::protocol::Error::kUnknown, "Unknown"); + + REQUIRE( seenErr == ftl::protocol::Error::kUnknown ); +} diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp index 66f8eeedfa654e4eb22ff34bcbe8cdeb161cd0a1..a9baf1bf8aff7842379e85b4c2d065c966a48afb 100644 --- a/test/stream_integration.cpp +++ b/test/stream_integration.cpp @@ -21,6 +21,21 @@ TEST_CASE("TCP Stream", "[net]") { auto p = ftl::connectNode(uri); p->waitConnection(5); + SECTION("fails if stream doesn't exist") { + auto s1 = self->getStream("ftl://mystream_bad"); + REQUIRE( s1 ); + + auto seenError = ftl::protocol::Error::kNoError; + auto h = s1->onError([&seenError](ftl::protocol::Error err, const std::string &str) { + seenError = err; + return true; + }); + + REQUIRE( s1->begin() ); + REQUIRE( !s1->enable(FrameID(0, 0)) ); + REQUIRE( seenError == ftl::protocol::Error::kURIDoesNotExist ); + } + SECTION("single enabled packet stream") { std::condition_variable cv; std::unique_lock<std::mutex> lk(mtx);