From 3d5b73e695001dbf2a3e6ef8aec62d07cf36a83a Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Mon, 9 May 2022 13:55:39 +0100
Subject: [PATCH] Add netstream and test

---
 CMakeLists.txt                    |   8 +-
 include/ftl/protocol.hpp          |   3 +-
 include/ftl/protocol/self.hpp     |  10 +
 include/ftl/protocol/streams.hpp  |   2 +
 include/ftl/threads.hpp           |   2 +-
 src/peer.cpp                      |   2 +-
 src/protocol.cpp                  |  18 +-
 src/self.cpp                      |  31 ++
 src/{ => streams}/broadcaster.cpp |   0
 src/{ => streams}/muxer.cpp       |   0
 src/streams/netstream.cpp         | 510 ++++++++++++++++++++++++++++++
 src/streams/netstream.hpp         | 118 +++++++
 src/streams/packetMsgpack.hpp     |  21 ++
 src/{ => streams}/streams.cpp     |   0
 src/universe.cpp                  |   5 +
 src/universe.hpp                  |  14 +-
 test/CMakeLists.txt               |  13 +-
 test/net_integration.cpp          |  14 +-
 test/stream_integration.cpp       |  65 ++++
 19 files changed, 809 insertions(+), 27 deletions(-)
 rename src/{ => streams}/broadcaster.cpp (100%)
 rename src/{ => streams}/muxer.cpp (100%)
 create mode 100644 src/streams/netstream.cpp
 create mode 100644 src/streams/netstream.hpp
 create mode 100644 src/streams/packetMsgpack.hpp
 rename src/{ => streams}/streams.cpp (100%)
 create mode 100644 test/stream_integration.cpp

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 517f260..8d1e0e8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -171,10 +171,12 @@ add_library(beyond-protocol STATIC
 	src/protocol/websocket.cpp
 	src/base64.cpp
 	src/protocol.cpp
-	src/streams.cpp
+	src/streams/streams.cpp
 	src/channelSet.cpp
-	src/muxer.cpp
-	src/broadcaster.cpp
+	src/streams/muxer.cpp
+	src/streams/broadcaster.cpp
+	src/streams/netstream.cpp
+	src/streams/filestream.cpp
 )
 
 target_include_directories(beyond-protocol PUBLIC
diff --git a/include/ftl/protocol.hpp b/include/ftl/protocol.hpp
index c735e0d..0244c66 100644
--- a/include/ftl/protocol.hpp
+++ b/include/ftl/protocol.hpp
@@ -25,7 +25,8 @@ extern ftl::UUID id;
 std::shared_ptr<ftl::protocol::Self> getSelf();
 std::shared_ptr<ftl::protocol::Self> createDummySelf();
 std::shared_ptr<ftl::protocol::Service> setServiceProvider(const std::string &uri);
-std::shared_ptr<ftl::protocol::Node> createNode(const std::string &uri);
+std::shared_ptr<ftl::protocol::Node> connectNode(const std::string &uri);
 std::shared_ptr<ftl::protocol::Stream> createStream(const std::string &uri);
+std::shared_ptr<ftl::protocol::Stream> getStream(const std::string &uri);
 
 }
diff --git a/include/ftl/protocol/self.hpp b/include/ftl/protocol/self.hpp
index 2aadeae..80e1273 100644
--- a/include/ftl/protocol/self.hpp
+++ b/include/ftl/protocol/self.hpp
@@ -11,6 +11,7 @@
 #include <ftl/handle.hpp>
 
 #include <memory>
+#include <string>
 
 namespace ftl {
 namespace net {
@@ -19,6 +20,9 @@ class Universe;
 
 namespace protocol {
 
+class Node;
+class Stream;
+
 struct Error {
 	int errno;
 };
@@ -28,6 +32,12 @@ class Self {
 	/** Peer for outgoing connection: resolve address and connect */
 	explicit Self(const std::shared_ptr<ftl::net::Universe> &impl);
 	virtual ~Self();
+
+	std::shared_ptr<ftl::protocol::Node> connectNode(const std::string &uri);
+
+	std::shared_ptr<ftl::protocol::Stream> createStream(const std::string &uri);
+
+	std::shared_ptr<ftl::protocol::Stream> getStream(const std::string &uri);
 	
 	void start();
 	
diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp
index bb8ca62..21b6ef7 100644
--- a/include/ftl/protocol/streams.hpp
+++ b/include/ftl/protocol/streams.hpp
@@ -115,6 +115,8 @@ class Stream {
 	 */
 	inline size_t size() const { return state_.size(); }
 
+	virtual bool enable(uint8_t fs, uint8_t f) { return true; }
+
 	protected:
 	ftl::Handler<const ftl::protocol::StreamPacket&, const ftl::protocol::Packet&> cb_;
 
diff --git a/include/ftl/threads.hpp b/include/ftl/threads.hpp
index 522d1cc..33eade4 100644
--- a/include/ftl/threads.hpp
+++ b/include/ftl/threads.hpp
@@ -47,7 +47,7 @@ namespace threads {
  * scope. */
 class _write_lock {
 public:
-	_write_lock(std::shared_mutex& mtx) : mtx_(&mtx) {
+	explicit _write_lock(std::shared_mutex& mtx) : mtx_(&mtx) {
 		mtx_->unlock_shared();
 		mtx_->lock();
 	}
diff --git a/src/peer.cpp b/src/peer.cpp
index 2a44aab..bbca9dd 100644
--- a/src/peer.cpp
+++ b/src/peer.cpp
@@ -92,7 +92,7 @@ void Peer::_process_handshake(uint64_t magic, uint32_t version, UUID pid) {
 		if (version != ftl::net::kVersion) LOG(WARNING) << "net protocol using different versions!";
 
 		LOG(INFO) << "(" << (outgoing_ ? "connecting" : "listening")
-				  << " peer) handshake received from remote";
+				  << " peer) handshake received from remote for " << pid.to_string();
 
 		status_ = NodeStatus::kConnected;
 		version_ = version;
diff --git a/src/protocol.cpp b/src/protocol.cpp
index 60091c0..c0092db 100644
--- a/src/protocol.cpp
+++ b/src/protocol.cpp
@@ -19,18 +19,24 @@ std::shared_ptr<ftl::protocol::Self> ftl::getSelf() {
 }
 
 std::shared_ptr<ftl::protocol::Self> ftl::createDummySelf() {
-    return std::make_shared<ftl::protocol::Self>(std::make_shared<ftl::net::Universe>());
+    ftl::UUID uuid;
+    auto u = std::make_shared<ftl::net::Universe>();
+    u->setLocalID(uuid);
+    return std::make_shared<ftl::protocol::Self>(u);
 }
 
 /*std::shared_ptr<ftl::protocol::Service> ftl::setServiceProvider(const std::string &uri) {
 
 }*/
 
-std::shared_ptr<ftl::protocol::Node> ftl::createNode(const std::string &uri) {
-    if (!universe) universe = std::make_shared<ftl::net::Universe>();
-    return std::make_shared<ftl::protocol::Node>(universe->connect(uri));
+std::shared_ptr<ftl::protocol::Node> ftl::connectNode(const std::string &uri) {
+    return getSelf()->connectNode(uri);
 }
 
-/*std::shared_ptr<ftl::protocol::Stream> ftl::createStream(const std::string &uri) {
+std::shared_ptr<ftl::protocol::Stream> ftl::createStream(const std::string &uri) {
+    return getSelf()->createStream(uri);
+}
 
-}*/
+std::shared_ptr<ftl::protocol::Stream> ftl::getStream(const std::string &uri) {
+    return getSelf()->getStream(uri);
+}
diff --git a/src/self.cpp b/src/self.cpp
index 63bdf72..c027e1d 100644
--- a/src/self.cpp
+++ b/src/self.cpp
@@ -1,11 +1,42 @@
 #include "universe.hpp"
 #include <ftl/protocol/self.hpp>
+#include "./streams/netstream.hpp"
 
 using ftl::protocol::Self;
 
 Self::Self(const std::shared_ptr<ftl::net::Universe> &impl): universe_(impl) {}
 
 Self::~Self() {}
+
+std::shared_ptr<ftl::protocol::Node> Self::connectNode(const std::string &uri) {
+    return std::make_shared<ftl::protocol::Node>(universe_->connect(uri));
+}
+
+std::shared_ptr<ftl::protocol::Stream> Self::createStream(const std::string &uri) {
+    ftl::URI u(uri);
+
+    if (!u.isValid()) throw FTL_Error("Invalid Stream URI");
+
+    switch (u.getScheme()) {
+    case ftl::URI::SCHEME_FTL   : return std::make_shared<ftl::protocol::Net>(uri, universe_.get(), true); 
+    case ftl::URI::SCHEME_FILE  :
+    case ftl::URI::SCHEME_NONE  :
+    default                     : throw FTL_Error("Invalid Stream URI");
+    }
+}
+
+std::shared_ptr<ftl::protocol::Stream> Self::getStream(const std::string &uri) {
+    ftl::URI u(uri);
+
+    if (!u.isValid()) throw FTL_Error("Invalid Stream URI");
+
+    switch (u.getScheme()) {
+    case ftl::URI::SCHEME_FTL   : return std::make_shared<ftl::protocol::Net>(uri, universe_.get(), false); 
+    case ftl::URI::SCHEME_FILE  :
+    case ftl::URI::SCHEME_NONE  :
+    default                     : throw FTL_Error("Invalid Stream URI");
+    }
+}
 	
 void Self::start() {
     universe_->start();
diff --git a/src/broadcaster.cpp b/src/streams/broadcaster.cpp
similarity index 100%
rename from src/broadcaster.cpp
rename to src/streams/broadcaster.cpp
diff --git a/src/muxer.cpp b/src/streams/muxer.cpp
similarity index 100%
rename from src/muxer.cpp
rename to src/streams/muxer.cpp
diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp
new file mode 100644
index 0000000..3b35584
--- /dev/null
+++ b/src/streams/netstream.cpp
@@ -0,0 +1,510 @@
+#include "netstream.hpp"
+//#include "adaptive.hpp"
+#include <ftl/time.hpp>
+#include "packetMsgpack.hpp"
+
+#define LOGURU_REPLACE_GLOG 1
+#include <ftl/lib/loguru.hpp>
+
+#ifndef WIN32
+#include <unistd.h>
+#include <limits.h>
+#endif
+
+using ftl::protocol::Net;
+using ftl::protocol::NetStats;
+using ftl::protocol::StreamPacket;
+using ftl::protocol::PacketMSGPACK;
+using ftl::protocol::StreamPacketMSGPACK;
+using ftl::protocol::Packet;
+using ftl::protocol::Channel;
+using ftl::protocol::Codec;
+using ftl::protocol::kAllFrames;
+using ftl::protocol::kAllFramesets;
+using std::string;
+using std::optional;
+
+static constexpr int kFramesToRequest = 30;
+
+std::atomic_size_t Net::req_bitrate__ = 0;
+std::atomic_size_t Net::tx_bitrate__ = 0;
+std::atomic_size_t Net::rx_sample_count__ = 0;
+std::atomic_size_t Net::tx_sample_count__ = 0;
+int64_t Net::last_msg__ = 0;
+MUTEX Net::msg_mtx__;
+
+static std::list<std::string> net_streams;
+static std::atomic_flag has_bindings = ATOMIC_FLAG_INIT;
+static SHARED_MUTEX stream_mutex;
+
+Net::Net(const std::string &uri, ftl::net::Universe *net, bool host) :
+		active_(false), net_(net), clock_adjust_(0), last_ping_(0), uri_(uri), host_(host) {
+	
+	// First net stream needs to register these RPC handlers
+	//if (!has_bindings.test_and_set()) {
+		if (net_->isBound("find_stream")) net_->unbind("find_stream");
+		net_->bind("find_stream", [net = net_](const std::string &uri) -> optional<ftl::UUID> {
+			LOG(INFO) << "Request for stream: " << uri;
+
+			ftl::URI u1(uri);
+			std::string base = u1.getBaseURI();
+
+			SHARED_LOCK(stream_mutex, lk);
+			for (const auto &s : net_streams) {
+				ftl::URI u2(s);
+				// Don't compare query string components.
+				if (base == u2.getBaseURI()) {
+					return net->id();
+				}
+			}
+			return {};
+		});
+
+		if (net_->isBound("list_streams")) net_->unbind("list_streams");
+		net_->bind("list_streams", [this]() {
+			SHARED_LOCK(stream_mutex, lk);
+			return net_streams;
+		});
+	//}
+
+	last_frame_ = 0;
+	time_peer_ = ftl::UUID(0);
+
+	//abr_ = new ftl::stream::AdaptiveBitrate(std::max(0, std::min(255, value("bitrate", 64))));
+
+	bitrate_ = 200; //abr_->current();
+	//abr_->setMaxRate(static_cast<uint8_t>(std::max(0, std::min(255, value("max_bitrate", 200)))));
+	//on("bitrate", [this]() {
+	//	abr_->setMaxRate(static_cast<uint8_t>(std::max(0, std::min(255, value("max_bitrate", 200)))));
+	//});
+
+	/*abr_enabled_ = value("abr_enabled", false);
+	on("abr_enabled", [this]() {
+		abr_enabled_ = value("abr_enabled", false);
+		bitrate_ = (abr_enabled_) ?
+			abr_->current() :
+			static_cast<uint8_t>(std::max(0, std::min(255, value("bitrate", 64))));
+		tally_ = 0;
+	});*/
+
+	/*value("paused", false);
+	on("paused", [this]() {
+		paused_ = value("paused", false);
+		if (!paused_) {
+			reset();
+		}
+	});*/
+}
+
+Net::~Net() {
+	end();
+
+	// FIXME: Wait to ensure no net callbacks are active.
+	// Do something better than this
+	std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+	//delete abr_;
+}
+
+bool Net::post(const StreamPacket &spkt, const Packet &pkt) {
+	if (!active_) return false;
+	bool hasStale = false;
+
+	// Lock to prevent clients being added / removed
+	{
+		SHARED_LOCK(mutex_,lk);
+		available(spkt.frameSetID()) += spkt.channel;
+
+		// Map the frameset ID from a local one to a remote one
+		StreamPacketMSGPACK spkt_net = *((StreamPacketMSGPACK*)&spkt);
+		spkt_net.streamID = _localToRemoteFS(spkt.streamID);
+
+		PacketMSGPACK pkt_strip;
+		pkt_strip.codec = pkt.codec;
+		pkt_strip.bitrate = pkt.bitrate;
+		pkt_strip.frame_count = pkt.frame_count;
+		pkt_strip.flags = pkt.flags;
+
+		if (host_) {
+            LOG(INFO) << "Send to " << clients_.size() << " clients";
+			auto c = clients_.begin();
+			while (c != clients_.end()) {
+				auto &client = *c;
+
+				// Strip packet data if channel is not wanted by client
+				const bool strip = int(spkt.channel) < 32 && pkt.data.size() > 0 && ((1 << int(spkt.channel)) & client.channels) == 0;
+
+				try {
+					short pre_transmit_latency = short(ftl::time::get_time() - spkt.localTimestamp);
+
+					if (!net_->send(client.peerid,
+							base_uri_,
+							pre_transmit_latency,  // Time since timestamp for tx
+							spkt_net,
+							(strip) ? pkt_strip : *((PacketMSGPACK*)&pkt))) {
+
+						// Send failed so mark as client stream completed
+						client.txcount = client.txmax;
+					} else {
+						if (!strip && pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp);
+
+						// Count frame as completed only if last block and channel is 0
+						// FIXME: This is unreliable, colour might not exist etc.
+						if (spkt_net.streamID == 0 && spkt.frame_number == 0 && spkt.channel == Channel::kColour) ++client.txcount;
+					}
+				} catch(...) {
+					client.txcount = client.txmax;
+				}
+
+				if (client.txcount >= client.txmax) {
+					hasStale = true;
+				}
+				++c;
+			}
+		} else {
+			try {
+				short pre_transmit_latency = short(ftl::time::get_time() - spkt.localTimestamp);
+				if (!net_->send(*peer_,
+						base_uri_,
+						pre_transmit_latency,  // Time since timestamp for tx
+						spkt_net,
+						*((PacketMSGPACK*)&pkt))) {
+
+				}
+				if (pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp);
+			} catch(...) {
+				// TODO: Some disconnect error
+			}
+		}
+	}
+
+	if (hasStale) _cleanUp();
+
+	return true;
+}
+
+uint32_t Net::_localToRemoteFS(uint32_t fsid) {
+	if (fsid == 255) return 255;
+	local_fsid_ = fsid;
+	return 0;
+}
+
+uint32_t Net::_remoteToLocalFS(uint32_t fsid) {
+	return local_fsid_; //(fsid == 255) ? 255 : local_fsid_;
+}
+
+bool Net::begin() {
+	if (active_) return true;
+	//if (!get<string>("uri")) return false;
+
+	//uri_ = *get<string>("uri");
+
+	ftl::URI u(uri_);
+	if (!u.isValid() || !(u.getScheme() == ftl::URI::SCHEME_FTL)) return false;
+	base_uri_ = u.getBaseURI();
+
+	if (net_->isBound(base_uri_)) {
+		LOG(ERROR) << "Stream already exists! - " << uri_;
+		active_ = false;
+		return false;
+	}
+
+	// Add the RPC handler for the URI
+	net_->bind(base_uri_, [this](ftl::net::Peer &p, short ttimeoff, const StreamPacketMSGPACK &spkt_raw, const PacketMSGPACK &pkt) {
+		int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count();
+
+		if (!active_) return;
+		if (paused_) return;
+
+		StreamPacket spkt = spkt_raw;
+		spkt.localTimestamp = now - int64_t(ttimeoff);
+		spkt.hint_capability = 0;
+		spkt.hint_source_total = 0;
+		spkt.version = 4;
+		spkt.hint_peerid = p.localID();
+		// Map remote frameset ID to a local one.
+		spkt.streamID = _remoteToLocalFS(spkt.streamID);
+
+		// Manage recuring requests
+		if (!host_ && last_frame_ != spkt.timestamp) {
+			UNIQUE_LOCK(mutex_, lk);
+			if (last_frame_ != spkt.timestamp) {
+				int tf = spkt.timestamp - last_frame_;  // Milliseconds per frame
+				int tc = now - last_completion_;		// Milliseconds since last frame completed
+				last_completion_ = now;
+				bytes_received_ = 0;
+				last_frame_ = spkt.timestamp;
+
+				lk.unlock();
+
+				// Apply adaptive bitrate adjustment if needed
+				/*if (abr_enabled_) {
+					int new_bitrate = abr_->adjustment(tf, tc, pkt.bitrate);
+					if (new_bitrate != bitrate_) {
+						bitrate_ = new_bitrate;
+						tally_ = 0;  // Force request send
+					}
+				}*/
+
+				if (size() > spkt.frameSetID()) {
+					auto sel = selected(spkt.frameSetID());
+
+					// A change in channel selections, so send those requests now
+					if (sel != last_selected_) {
+						auto changed = sel - last_selected_;
+						last_selected_ = sel;
+
+						for (auto c : changed) {
+							_sendRequest(c, spkt.frameSetID(), kAllFrames, kFramesToRequest, 255);
+						}
+					}
+				}
+
+				// Are we close to reaching the end of our frames request?
+				if (tally_ <= 5) {
+					// Yes, so send new requests
+					for (size_t i = 0; i < size(); ++i) {
+						const auto &sel = selected(i);
+						
+						for (auto c : sel) {
+							_sendRequest(c, i, kAllFrames, kFramesToRequest, 255);
+						}
+					}
+					tally_ = kFramesToRequest;
+				} else {
+					--tally_;
+				}
+			}
+		}
+
+		bytes_received_ += pkt.data.size();
+		//time_at_last_ = now;
+
+		// If hosting and no data then it is a request for data
+		// Note: a non host can receive empty data, meaning data is available
+		// but that you did not request it
+		if (host_ && pkt.data.size() == 0 && (spkt.flags & ftl::protocol::kFlagRequest)) {
+			_processRequest(p, spkt, pkt);
+		} else {
+			// FIXME: Allow availability to change...
+			available(spkt.frameSetID()) += spkt.channel;
+		}
+
+		cb_.trigger(spkt, pkt);
+		if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
+	});
+
+	if (host_) {
+		LOG(INFO) << "Hosting stream: " << uri_;
+
+		// Alias the URI to the configurable if not already
+		// Allows the URI to be used to get config data.
+		/*if (ftl::config::find(uri_) == nullptr) {
+			ftl::config::alias(uri_, this);
+		}*/
+
+		{
+			// Add to list of available streams
+			UNIQUE_LOCK(stream_mutex, lk);
+			net_streams.push_back(uri_);
+		}
+
+		// Automatically set name if missing
+		//if (!get<std::string>("name")) {
+			char hostname[1024] = {0};
+			#ifdef WIN32
+			DWORD size = 1024;
+			GetComputerName(hostname, &size);
+			#else
+			gethostname(hostname, 1024);
+			#endif
+
+			//set("name", std::string(hostname));
+		//}
+
+		net_->broadcast("add_stream", uri_);
+		active_ = true;
+		
+	} else {
+		
+		tally_ = kFramesToRequest;
+		active_ = true;
+	}
+
+	return true;
+}
+
+void Net::reset() {
+	UNIQUE_LOCK(mutex_, lk);
+
+	for (size_t i = 0; i < size(); ++i) {
+		auto sel = selected(i);
+		
+		for (auto c : sel) {
+			_sendRequest(c, i, kAllFrames, kFramesToRequest, 255, true);
+		}
+	}
+	tally_ = kFramesToRequest;
+}
+
+bool Net::enable(uint8_t fs, uint8_t f) {
+	if (host_) { return false; }
+
+	// not hosting, try to find peer now
+	// First find non-proxy version, then check for proxy version if no match
+	auto p = net_->findOne<ftl::UUID>("find_stream", uri_);
+
+	if (p) {
+		peer_ = *p;
+	} else {
+		// use webservice (if connected)
+		auto ws = net_->getWebService();
+		if (ws) {
+			peer_ = ws->id();
+		} else {
+			LOG(ERROR) << "Stream Peer not found";
+			return false;
+		}
+	}
+	
+	// TODO: check return value
+	net_->send(*peer_, "enable_stream", uri_, fs, 0);
+	_sendRequest(Channel::kColour, fs, kAllFrames, kFramesToRequest, 255, true);
+
+	return true;
+}
+
+bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t count, uint8_t bitrate, bool doreset) {
+	if (!active_ || host_) return false;
+
+	PacketMSGPACK pkt = {
+		Codec::kAny,			// TODO: Allow specific codec requests
+		0,
+		count,
+		bitrate_,
+		0
+	};
+
+	uint8_t sflags = ftl::protocol::kFlagRequest;
+	if (doreset) sflags |= ftl::protocol::kFlagReset;
+
+	StreamPacketMSGPACK spkt = {
+		5,
+		ftl::time::get_time(),
+		frameset,
+		frames,
+		c,
+		sflags,
+		0,
+		0,
+		0
+	};
+
+	net_->send(*peer_, base_uri_, (short)0, spkt, pkt);
+	return true;
+}
+
+void Net::_cleanUp() {
+	UNIQUE_LOCK(mutex_,lk);
+	for (auto i=clients_.begin(); i!=clients_.end(); ++i) {
+		auto &client = *i;
+		if (client.txcount >= client.txmax) {
+			if (client.peerid == time_peer_) {
+				time_peer_ = ftl::UUID(0);
+			}
+			LOG(INFO) << "Remove peer: " << client.peerid.to_string();
+			i = clients_.erase(i);
+		}
+	}
+}
+
+/* Packets for specific framesets, frames and channels are requested in
+ * batches (max 255 unique frames by timestamp). Requests are in the form
+ * of packets that match the request except the data component is empty.
+ */
+bool Net::_processRequest(ftl::net::Peer &p, StreamPacket &spkt, const Packet &pkt) {
+	bool found = false;
+    LOG(INFO) << "processing request";
+
+	{
+		SHARED_LOCK(mutex_,lk);
+		// Does the client already exist
+		for (auto &c : clients_) {
+			if (c.peerid == p.id()) {
+				// Yes, so reset internal request counters
+				c.txcount = 0;
+				c.txmax = static_cast<int>(pkt.frame_count);
+				if (int(spkt.channel) < 32) c.channels |= 1 << int(spkt.channel);
+				found = true;
+				// break;
+			}
+		}
+	}
+
+	// No existing client, so add a new one.
+	if (!found) {
+		{
+			UNIQUE_LOCK(mutex_,lk);
+
+			auto &client = clients_.emplace_back();
+			client.peerid = p.id();
+			client.quality = 255;  // TODO: Use quality given in packet
+			client.txcount = 0;
+			client.txmax = static_cast<int>(pkt.frame_count);
+			if (int(spkt.channel) < 32) client.channels |= 1 << int(spkt.channel);
+		}
+
+		spkt.hint_capability |= ftl::protocol::kStreamCap_NewConnection;
+
+		try {
+			connect_cb_.trigger(&p);
+		} catch (const ftl::exception &e) {
+			LOG(ERROR) << "Exception in stream connect callback: " << e.what();
+		}
+	}
+
+    LOG(INFO) << "Request processed";
+
+	return false;
+}
+
+void Net::_checkRXRate(size_t rx_size, int64_t rx_latency, int64_t ts) {
+	req_bitrate__ += rx_size * 8;
+	rx_sample_count__ += 1;
+}
+
+void Net::_checkTXRate(size_t tx_size, int64_t tx_latency, int64_t ts) {
+	tx_bitrate__ += tx_size * 8;
+	tx_sample_count__ += 1;
+}
+
+NetStats Net::getStatistics() {
+	int64_t ts = ftl::time::get_time();
+	UNIQUE_LOCK(msg_mtx__,lk);
+	const float r = (float(req_bitrate__) / float(ts - last_msg__) * 1000.0f / 1048576.0f);
+	const float t = (float(tx_bitrate__) / float(ts - last_msg__) * 1000.0f / 1048576.0f);
+	last_msg__ = ts;
+	req_bitrate__ = 0;
+	tx_bitrate__ = 0;
+	rx_sample_count__ = 0;
+	tx_sample_count__ = 0;
+	return {r, t};
+}
+
+bool Net::end() {
+	if (!active_) return false;
+
+	{
+		UNIQUE_LOCK(stream_mutex, lk);
+		auto i = std::find(net_streams.begin(), net_streams.end(), uri_);
+		if (i != net_streams.end()) net_streams.erase(i);
+	}
+
+	active_ = false;
+	net_->unbind(base_uri_);
+	return true;
+}
+
+bool Net::active() {
+	return active_;
+}
diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp
new file mode 100644
index 0000000..04f97f7
--- /dev/null
+++ b/src/streams/netstream.hpp
@@ -0,0 +1,118 @@
+#pragma once
+
+#include "../universe.hpp"
+#include <ftl/threads.hpp>
+#include <ftl/protocol/packet.hpp>
+#include <ftl/protocol/streams.hpp>
+#include <ftl/handle.hpp>
+#include <string>
+
+namespace ftl {
+namespace protocol {
+
+class AdaptiveBitrate;
+
+namespace detail {
+struct StreamClient {
+	ftl::UUID peerid;
+	std::atomic<int> txcount;			// Frames sent since last request
+	int txmax;							// Frames to send in request
+	std::atomic<uint32_t> channels;		// A channel mask, those that have been requested
+	uint8_t quality;
+};
+}
+
+/**
+ * The maximum number of frames a client can request in a single request.
+ */
+static const int kMaxFrames = 100;
+
+struct NetStats {
+	float rxRate;
+	float txRate;
+};
+
+/**
+ * Send and receive packets over a network. This class manages the connection
+ * of clients or the discovery of a stream and deals with bitrate adaptations.
+ * Each packet post is forwarded to each connected client that is still active.
+ */
+class Net : public Stream {
+	public:
+	Net(const std::string &uri, ftl::net::Universe *net, bool host=false);
+	~Net();
+
+	bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) override;
+
+	bool begin() override;
+	bool end() override;
+	bool active() override;
+
+	bool enable(uint8_t fs, uint8_t f) override;
+
+	void reset() override;
+
+	inline const ftl::UUID &getPeer() const {
+		if (host_) { throw FTL_Error("Net::getPeer() not possible, hosting stream"); }
+		if (!peer_){ throw FTL_Error("steram::Net has no valid Peer. Not found earlier?"); }
+		return *peer_;
+	}
+
+	inline ftl::Handle onClientConnect(const std::function<bool(ftl::net::Peer*)> &cb) { return connect_cb_.on(cb); }
+
+	/**
+	 * Return the average bitrate of all streams since the last call to this
+	 * function. Units are Mbps.
+	 */
+	static NetStats getStatistics();
+
+private:
+	SHARED_MUTEX mutex_;
+	bool active_;
+	ftl::net::Universe *net_;
+	int64_t clock_adjust_;
+	ftl::UUID time_peer_;
+	std::optional<ftl::UUID> peer_;
+	int64_t last_frame_;
+	int64_t last_ping_;
+	std::string uri_;
+	std::string base_uri_;
+	const bool host_;
+	int tally_;
+	std::array<std::atomic<int>,32> reqtally_ = {0};
+	ftl::protocol::ChannelSet last_selected_;
+	uint8_t bitrate_=255;
+	std::atomic_int64_t bytes_received_ = 0;
+	int64_t last_completion_ = 0;
+	int64_t time_at_last_ = 0;
+	float required_bps_;
+	float actual_bps_;
+	bool abr_enabled_;
+	bool paused_ = false;
+
+	AdaptiveBitrate *abr_;
+
+	ftl::Handler<ftl::net::Peer*> connect_cb_;
+
+	uint32_t local_fsid_ = 0;
+
+	static std::atomic_size_t req_bitrate__;
+	static std::atomic_size_t tx_bitrate__;
+	static std::atomic_size_t rx_sample_count__;
+	static std::atomic_size_t tx_sample_count__;
+	static int64_t last_msg__;
+	static MUTEX msg_mtx__;
+
+	std::list<detail::StreamClient> clients_;
+
+	bool _processRequest(ftl::net::Peer &p, ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt);
+	void _checkRXRate(size_t rx_size, int64_t rx_latency, int64_t ts);
+	void _checkTXRate(size_t tx_size, int64_t tx_latency, int64_t ts);
+	bool _sendRequest(ftl::protocol::Channel c, uint8_t frameset, uint8_t frames, uint8_t count, uint8_t bitrate, bool doreset=false);
+	void _cleanUp();
+	uint32_t _localToRemoteFS(uint32_t fsid);
+	uint32_t _remoteToLocalFS(uint32_t fsid);
+};
+
+}
+}
diff --git a/src/streams/packetMsgpack.hpp b/src/streams/packetMsgpack.hpp
new file mode 100644
index 0000000..673ed99
--- /dev/null
+++ b/src/streams/packetMsgpack.hpp
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <ftl/protocol/packet.hpp>
+#include <msgpack.hpp>
+
+MSGPACK_ADD_ENUM(ftl::protocol::Codec);
+MSGPACK_ADD_ENUM(ftl::protocol::Channel);
+
+namespace ftl {
+namespace protocol {
+
+struct StreamPacketMSGPACK : ftl::protocol::StreamPacket {
+    MSGPACK_DEFINE(timestamp, streamID, frame_number, channel, flags);
+};
+
+struct PacketMSGPACK : ftl::protocol::Packet {
+    MSGPACK_DEFINE(codec, reserved, frame_count, bitrate, flags, data);
+};
+
+}
+}
diff --git a/src/streams.cpp b/src/streams/streams.cpp
similarity index 100%
rename from src/streams.cpp
rename to src/streams/streams.cpp
diff --git a/src/universe.cpp b/src/universe.cpp
index 0ce6b5c..a5d907d 100644
--- a/src/universe.cpp
+++ b/src/universe.cpp
@@ -617,6 +617,11 @@ void Universe::_notifyConnect(Peer *p) {
 	// The peer could have been removed from valid peers already.
 	if (!ptr) return;
 
+	{
+		UNIQUE_LOCK(net_mutex_,lk);
+		peer_ids_[ptr->id()] = ptr->local_id_;
+	}
+
 	on_connect_.triggerAsync(ptr);
 }
 
diff --git a/src/universe.hpp b/src/universe.hpp
index 0f8d999..4d93208 100644
--- a/src/universe.hpp
+++ b/src/universe.hpp
@@ -237,7 +237,7 @@ template <typename... ARGS>
 void Universe::broadcast(const std::string &name, ARGS... args) {
 	SHARED_LOCK(net_mutex_,lk);
 	for (auto &p : peers_) {
-		if (!p->waitConnection()) continue;
+		if (!p || !p->waitConnection()) continue;
 		p->send(name, args...);
 	}
 }
@@ -266,7 +266,7 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
 	{
 		SHARED_LOCK(net_mutex_,lk);
 		for (auto &p : peers_) {
-			if (!p->waitConnection()) continue;
+			if (!p || !p->waitConnection()) continue;
 			p->asyncCall<std::optional<R>>(name, handler, args...);
 		}
 	}
@@ -303,7 +303,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) {
 	{
 		SHARED_LOCK(net_mutex_,lk);
 		for (auto &p : peers_) {
-			if (!p->waitConnection()) continue;
+			if (!p || !p->waitConnection()) continue;
 			++sdata->sentcount;
 			p->asyncCall<std::vector<R>>(name, handler, args...);
 		}
@@ -316,7 +316,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) {
 
 template <typename R, typename... ARGS>
 R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) {
-	Peer *p = getPeer(pid);
+	std::shared_ptr<Peer> p = getPeer(pid);
 	if (p == nullptr || !p->isConnected()) {
 		if (p == nullptr) throw FTL_Error("Attempting to call an unknown peer : " << pid.to_string());
 		else throw FTL_Error("Attempting to call an disconnected peer : " << pid.to_string());
@@ -326,7 +326,7 @@ R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) {
 
 template <typename R, typename... ARGS>
 int Universe::asyncCall(const ftl::UUID &pid, const std::string &name, std::function<void(const R&)> cb, ARGS... args) {
-	Peer *p = getPeer(pid);
+	std::shared_ptr<Peer> p = getPeer(pid);
 	if (p == nullptr || !p->isConnected()) {
 		if (p == nullptr) throw FTL_Error("Attempting to call an unknown peer : " << pid.to_string());
 		else throw FTL_Error("Attempting to call an disconnected peer : " << pid.to_string());
@@ -336,7 +336,7 @@ int Universe::asyncCall(const ftl::UUID &pid, const std::string &name, std::func
 
 template <typename... ARGS>
 bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) {
-	Peer *p = getPeer(pid);
+	std::shared_ptr<Peer> p = getPeer(pid);
 	if (p == nullptr) {
 		LOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string();
 		return false;
@@ -347,7 +347,7 @@ bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args)
 
 template <typename... ARGS>
 int Universe::try_send(const ftl::UUID &pid, const std::string &name, ARGS... args) {
-	Peer *p = getPeer(pid);
+	std::shared_ptr<Peer> p = getPeer(pid);
 	if (p == nullptr) {
 		//DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string();
 		return false;
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index c770412..d140b40 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -59,4 +59,15 @@ target_include_directories(stream_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../in
 target_link_libraries(stream_unit
 	beyond-protocol ${URIPARSER_LIBRARIES} ${OS_LIBS})
 
-add_test(StreamUnitTest stream_unit)
\ No newline at end of file
+add_test(StreamUnitTest stream_unit)
+
+### Stream Integration #########################################################
+add_executable(stream_integration
+	$<TARGET_OBJECTS:CatchTestFTL>
+	./stream_integration.cpp
+)
+target_include_directories(stream_integration PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include")
+target_link_libraries(stream_integration
+	beyond-protocol GnuTLS::GnuTLS Threads::Threads ${URIPARSER_LIBRARIES} ${UUID_LIBRARIES} ${OS_LIBS})
+
+add_test(StreamIntegrationTest stream_integration)
\ No newline at end of file
diff --git a/test/net_integration.cpp b/test/net_integration.cpp
index e30c0aa..68f0fb9 100644
--- a/test/net_integration.cpp
+++ b/test/net_integration.cpp
@@ -34,7 +34,7 @@ TEST_CASE("Listen and Connect", "[net]") {
 	SECTION("valid tcp connection using ipv4") {
 		auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort());
 		LOG(INFO) << uri;
-		auto p = ftl::createNode(uri);
+		auto p = ftl::connectNode(uri);
 		REQUIRE( p );
 		
 		REQUIRE( p->waitConnection(5) );
@@ -45,7 +45,7 @@ TEST_CASE("Listen and Connect", "[net]") {
 
 	SECTION("valid tcp connection using hostname") {
 		auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort());
-		auto p = ftl::createNode(uri);
+		auto p = ftl::connectNode(uri);
 		REQUIRE( p );
 		
 		REQUIRE( p->waitConnection(5) );
@@ -57,7 +57,7 @@ TEST_CASE("Listen and Connect", "[net]") {
 	SECTION("invalid protocol") {
 		bool throws = false;
 		try {
-			auto p = ftl::createNode("http://localhost:1234");
+			auto p = ftl::connectNode("http://localhost:1234");
 		}
 		catch (const ftl::exception& ex) {
 			ex.ignore();
@@ -73,7 +73,7 @@ TEST_CASE("Listen and Connect", "[net]") {
 
 		auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort());
 
-		auto p_connecting = ftl::createNode(uri);
+		auto p_connecting = ftl::connectNode(uri);
 		REQUIRE(p_connecting);
 		
 		bool disconnected_once = false;
@@ -102,7 +102,7 @@ TEST_CASE("Listen and Connect", "[net]") {
 
 		auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort());
 
-		auto p_connecting = ftl::createNode(uri);
+		auto p_connecting = ftl::connectNode(uri);
 		REQUIRE(p_connecting);
 		
 		bool disconnected_once = false;
@@ -143,7 +143,7 @@ TEST_CASE("Self::onConnect()", "[net]") {
 			return true;
 		});
 
-		REQUIRE( ftl::createNode(uri)->waitConnection(5) );
+		REQUIRE( ftl::connectNode(uri)->waitConnection(5) );
 
 		bool result = try_for(20, [&done]{ return done; });
 		REQUIRE( result );
@@ -157,7 +157,7 @@ TEST_CASE("Self::onConnect()", "[net]") {
 			return true;
 		});
 
-		REQUIRE( ftl::createNode(uri)->waitConnection(5) );
+		REQUIRE( ftl::connectNode(uri)->waitConnection(5) );
 
 		REQUIRE( done );
 	}
diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp
new file mode 100644
index 0000000..bb9dd98
--- /dev/null
+++ b/test/stream_integration.cpp
@@ -0,0 +1,65 @@
+#include "catch.hpp"
+#include <ftl/protocol.hpp>
+#include <ftl/protocol/self.hpp>
+#include <ftl/protocol/streams.hpp>
+#include <ftl/uri.hpp>
+#include <ftl/exception.hpp>
+#include <ftl/protocol/node.hpp>
+
+// --- Tests -------------------------------------------------------------------
+
+TEST_CASE("TCP Stream", "[net]") {
+	std::mutex mtx;
+
+    ftl::protocol::reset();
+
+	auto self = ftl::createDummySelf();
+	self->listen(ftl::URI("tcp://localhost:0")); 
+
+	auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort());
+	LOG(INFO) << uri;
+	auto p = ftl::connectNode(uri);
+	p->waitConnection(5);
+
+	SECTION("single enabled packet stream") {
+		std::condition_variable cv;
+		std::unique_lock<std::mutex> lk(mtx);
+
+		auto s1 = ftl::createStream("ftl://mystream");
+		REQUIRE( s1 );
+
+		auto s2 = self->getStream("ftl://mystream");
+		REQUIRE( s2 );
+
+		ftl::protocol::Packet rpkt;
+
+		auto h = s2->onPacket([&cv, &rpkt](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) {
+			rpkt = pkt;
+			cv.notify_one();
+			return true;
+		});
+
+		s1->begin();
+		s2->begin();
+
+		s2->enable(0, 0);
+
+		// TODO: Find better option
+		std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+		ftl::protocol::StreamPacket spkt;
+		spkt.streamID = 0;
+		spkt.frame_number = 0;
+		spkt.channel = ftl::protocol::Channel::kColour;
+		ftl::protocol::Packet pkt;
+		pkt.bitrate = 10;
+		pkt.codec = ftl::protocol::Codec::kJPG;
+		pkt.frame_count = 1;
+		s1->post(spkt, pkt);
+
+		REQUIRE(cv.wait_for(lk, std::chrono::seconds(5)) == std::cv_status::no_timeout);
+		REQUIRE( rpkt.bitrate == 10 );
+		REQUIRE( rpkt.codec == ftl::protocol::Codec::kJPG );
+		REQUIRE( rpkt.frame_count == 1 );
+	}
+}
-- 
GitLab