From 9fb166701696c5db898e988b103f345a7075a9a3 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Thu, 12 May 2022 12:15:58 +0000
Subject: [PATCH] #24 Add stream error callbacks

---
 include/ftl/protocol/error.hpp   |  4 +++-
 include/ftl/protocol/muxer.hpp   | 10 +---------
 include/ftl/protocol/streams.hpp |  9 ++++++++-
 src/streams/muxer.cpp            |  5 +++++
 src/streams/netstream.cpp        |  8 +++++---
 src/streams/streams.cpp          |  4 ++++
 test/muxer_unit.cpp              | 27 +++++++++++++++++++++++++++
 test/stream_integration.cpp      | 15 +++++++++++++++
 8 files changed, 68 insertions(+), 14 deletions(-)

diff --git a/include/ftl/protocol/error.hpp b/include/ftl/protocol/error.hpp
index 52575e4..97f5f2b 100644
--- a/include/ftl/protocol/error.hpp
+++ b/include/ftl/protocol/error.hpp
@@ -16,7 +16,9 @@ enum struct Error {
     kBadHandshake,
     kConnectionFailed,
     kSelfConnect,
-    kListen
+    kListen,
+    kURIAlreadyExists,
+    kURIDoesNotExist
 };
 
 }
diff --git a/include/ftl/protocol/muxer.hpp b/include/ftl/protocol/muxer.hpp
index 44041ad..f3d85a0 100644
--- a/include/ftl/protocol/muxer.hpp
+++ b/include/ftl/protocol/muxer.hpp
@@ -56,6 +56,7 @@ class Muxer : public Stream {
 		ftl::Handle handle;
 		ftl::Handle req_handle;
 		ftl::Handle avail_handle;
+		ftl::Handle err_handle;
 		int id = 0;
 		int fixed_fs = -1;
 	};
@@ -64,12 +65,7 @@ class Muxer : public Stream {
 	std::unordered_map<int, int> sourcecount_;
 	std::unordered_map<int64_t, FrameID> imap_;
 	std::unordered_map<FrameID, std::pair<FrameID, Muxer::StreamEntry*>> omap_;
-
 	std::list<StreamEntry> streams_;
-	//std::vector<std::pair<StreamEntry*,int>> revmap_[kMaxStreams];
-	//std::list<ftl::Handle> handles_;
-	//int nid_[kMaxStreams];
-	//StreamCallback cb_;
 	mutable SHARED_MUTEX mutex_;
 	std::atomic_int stream_ids_ = 0;
 	std::atomic_int framesets_ = 0;
@@ -79,10 +75,6 @@ class Muxer : public Stream {
 
 	/* On posting, map to output ID */
 	std::pair<FrameID, StreamEntry*> _mapToOutput(FrameID id) const;
-
-	//void _notify(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt);
-	//int _lookup(size_t fsid, StreamEntry *se, int ssid, int count);
-	//void _forward(const std::string &name);
 };
 
 }
diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp
index 5f24b61..2de2eec 100644
--- a/include/ftl/protocol/streams.hpp
+++ b/include/ftl/protocol/streams.hpp
@@ -12,13 +12,13 @@
 #include <ftl/protocol/channelSet.hpp>
 #include <ftl/protocol/packet.hpp>
 #include <ftl/protocol/frameid.hpp>
+#include <ftl/protocol/error.hpp>
 #include <string>
 #include <vector>
 #include <unordered_set>
 
 namespace ftl {
 namespace protocol {
-
 /* Represents a request for data through a stream */
 struct Request {
 	FrameID id;
@@ -181,6 +181,8 @@ class Stream {
 
 	virtual StreamType type() const { return StreamType::kUnknown; }
 
+	ftl::Handle onError(const std::function<bool(ftl::protocol::Error, const std::string &)> &cb) { return error_cb_.on(cb); }
+
 	protected:
 	void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt);
 
@@ -188,6 +190,8 @@ class Stream {
 
 	void request(const Request &req);
 
+	void error(ftl::protocol::Error, const std::string &str);
+
 	mutable SHARED_MUTEX mtx_;
 
 	private:
@@ -201,8 +205,11 @@ class Stream {
 	ftl::Handler<const ftl::protocol::StreamPacket&, const ftl::protocol::Packet&> cb_;
 	ftl::Handler<const Request &> request_cb_;
 	ftl::Handler<FrameID, ftl::protocol::Channel> avail_cb_;
+	ftl::Handler<ftl::protocol::Error, const std::string&> error_cb_;
 	std::unordered_map<int, FSState> state_;
 };
 
+using StreamPtr = std::shared_ptr<Stream>;
+
 }
 }
diff --git a/src/streams/muxer.cpp b/src/streams/muxer.cpp
index 6a84a99..3e7f592 100644
--- a/src/streams/muxer.cpp
+++ b/src/streams/muxer.cpp
@@ -95,6 +95,11 @@ void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) {
 		request(newRequest);
 		return true;
 	}));
+
+	se.err_handle = std::move(s->onError([this](ftl::protocol::Error err, const std::string &str) {
+		error(err, str);
+		return true;
+	}));
 }
 
 void Muxer::remove(const std::shared_ptr<Stream> &s) {
diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp
index 7cf263c..7292328 100644
--- a/src/streams/netstream.cpp
+++ b/src/streams/netstream.cpp
@@ -20,6 +20,7 @@ using ftl::protocol::Packet;
 using ftl::protocol::Channel;
 using ftl::protocol::Codec;
 using ftl::protocol::FrameID;
+using ftl::protocol::Error;
 using ftl::protocol::kAllFrames;
 using ftl::protocol::kAllFramesets;
 using std::string;
@@ -174,6 +175,7 @@ bool Net::post(const StreamPacket &spkt, const Packet &pkt) {
 				if (pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp);
 			} catch(...) {
 				// TODO: Some disconnect error
+				return false;
 			}
 		}
 	}
@@ -204,7 +206,7 @@ bool Net::begin() {
 	base_uri_ = u.getBaseURI();
 
 	if (net_->isBound(base_uri_)) {
-		LOG(ERROR) << "Stream already exists! - " << uri_;
+		error(Error::kURIAlreadyExists, std::string("Stream already exists: ") + uri_);
 		active_ = false;
 		return false;
 	}
@@ -373,7 +375,7 @@ bool Net::_enable(FrameID id) {
 		if (ws) {
 			peer_ = ws->id();
 		} else {
-			LOG(ERROR) << "Stream Peer not found";
+			error(Error::kURIDoesNotExist, std::string("Stream not found: ") + uri_);
 			return false;
 		}
 	}
@@ -448,7 +450,7 @@ void Net::_cleanUp() {
 			if (client.peerid == time_peer_) {
 				time_peer_ = ftl::UUID(0);
 			}
-			LOG(INFO) << "Remove peer: " << client.peerid.to_string();
+			DLOG(INFO) << "Remove peer: " << client.peerid.to_string();
 			i = clients_.erase(i);
 		}
 	}
diff --git a/src/streams/streams.cpp b/src/streams/streams.cpp
index 5296749..bf12a13 100644
--- a/src/streams/streams.cpp
+++ b/src/streams/streams.cpp
@@ -142,3 +142,7 @@ void Stream::seen(FrameID id, ftl::protocol::Channel channel) {
 void Stream::request(const ftl::protocol::Request &req) {
 	request_cb_.trigger(req);
 }
+
+void Stream::error(ftl::protocol::Error err, const std::string &str) {
+	error_cb_.trigger(err, str);
+}
diff --git a/test/muxer_unit.cpp b/test/muxer_unit.cpp
index 03d2991..bac6374 100644
--- a/test/muxer_unit.cpp
+++ b/test/muxer_unit.cpp
@@ -38,6 +38,10 @@ class TestStream : public ftl::protocol::Stream {
     void forceSeen(FrameID id, Channel channel) {
         seen(id, channel);
     }
+
+    void fakeError(ftl::protocol::Error err, const std::string &str) {
+        error(err, str);
+    }
 };
 
 TEST_CASE("Muxer post, distinct framesets", "[stream]") {
@@ -570,3 +574,26 @@ TEST_CASE("Muxer enabledChannels", "[stream]") {
         REQUIRE( set.size() == 0 );
     }
 }
+
+TEST_CASE("Muxer onError", "[stream]") {
+	std::unique_ptr<Muxer> mux = std::make_unique<Muxer>();
+	REQUIRE(mux);
+
+    std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>();
+    REQUIRE(s1);
+    std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>();
+    REQUIRE(s2);
+
+    mux->add(s1);
+	mux->add(s2);
+
+    ftl::protocol::Error seenErr = ftl::protocol::Error::kNoError;
+    auto h = mux->onError([&seenErr](ftl::protocol::Error err, const std::string &str) {
+        seenErr = err;
+        return true;
+    });
+
+    s1->fakeError(ftl::protocol::Error::kUnknown, "Unknown");
+
+    REQUIRE( seenErr == ftl::protocol::Error::kUnknown );
+}
diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp
index 66f8eee..a9baf1b 100644
--- a/test/stream_integration.cpp
+++ b/test/stream_integration.cpp
@@ -21,6 +21,21 @@ TEST_CASE("TCP Stream", "[net]") {
 	auto p = ftl::connectNode(uri);
 	p->waitConnection(5);
 
+	SECTION("fails if stream doesn't exist") {
+		auto s1 = self->getStream("ftl://mystream_bad");
+		REQUIRE( s1 );
+
+		auto seenError = ftl::protocol::Error::kNoError;
+		auto h = s1->onError([&seenError](ftl::protocol::Error err, const std::string &str) {
+			seenError = err;
+			return true;
+		});
+
+		REQUIRE( s1->begin() );
+		REQUIRE( !s1->enable(FrameID(0, 0)) );
+		REQUIRE( seenError == ftl::protocol::Error::kURIDoesNotExist );
+	}
+
 	SECTION("single enabled packet stream") {
 		std::condition_variable cv;
 		std::unique_lock<std::mutex> lk(mtx);
-- 
GitLab