From 6c08c53c118be70d7f0f24f475ec67fa8f5777d7 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Wed, 12 Jun 2019 21:07:04 +0300
Subject: [PATCH] Fixes #77 multiple ws stream clients

---
 web-service/src/index.js | 95 +++++++++++++++++++++++++++++++---------
 1 file changed, 74 insertions(+), 21 deletions(-)

diff --git a/web-service/src/index.js b/web-service/src/index.js
index 8fbba0ad6..f4096ecdc 100644
--- a/web-service/src/index.js
+++ b/web-service/src/index.js
@@ -11,6 +11,76 @@ let peer_uris = {};
 
 let uri_data = {};
 
+function RGBDClient(peer, N, rate, dest) {
+	this.peer = peer;
+	this.txmax = N;
+	this.rate = rate;
+	this.dest = dest;
+	this.txcount = 0;
+}
+
+RGBDClient.prototype.push = function(uri, rgb, depth) {
+	this.peer.send(uri, rgb, depth);
+	this.txcount++;
+}
+
+function RGBDStream(uri, peer) {
+	this.uri = uri;
+	this.peer = peer;
+	this.title = "";
+	this.rgb = null;
+	this.depth = null;
+	this.pose = null;
+	this.clients = [];
+	this.rxcount = 10;
+	this.rxmax = 10;
+
+	peer.bind(uri, (rgb, depth) => {
+		this.pushFrames(rgb, depth);
+		this.rxcount++;
+		if (this.rxcount >= this.rxmax && this.clients.length > 0) {
+			this.subscribe();
+		}
+	});
+}
+
+RGBDStream.prototype.addClient = function(peer, N, rate, dest) {
+	// TODO(Nick) Verify that it isn't already in list...
+	for (let i=0; i<this.clients.length; i++) {
+		if (this.clients[i].peer.string_id == peer.string_id) return;
+	}
+
+	this.clients.push(new RGBDClient(peer, N, rate, dest));
+
+	if (this.rxcount >= this.rxmax) {
+		this.subscribe();
+	}
+}
+
+RGBDStream.prototype.subscribe = function() {
+	this.rxcount = 0;
+	this.rxmax = 10;
+	console.log("Subscribe to ", this.uri);
+	this.peer.send("get_stream", this.uri, 10, 0, [Peer.uuid], this.uri);
+}
+
+RGBDStream.prototype.pushFrames = function(rgb, depth) {
+	this.rgb = rgb;
+	this.depth = depth;
+
+	for (let i=0; i < this.clients.length; i++) {
+		this.clients[i].push(this.uri, rgb, depth);
+	}
+
+	let i=0;
+	while (i < this.clients.length) {
+		if (this.clients[i].txcount >= this.clients[i].txmax) {
+			console.log("remove client");
+			this.clients.splice(i, 1);
+		} else i++;
+	}
+}
+
 // ---- PROTOCOL ---------------------------------------------------------------
 
 app.get('/', (req, res) => {
@@ -49,13 +119,7 @@ function checkStreams(peer) {
 				//uri_to_peer[streams[i]] = peer;
 				peer_uris[peer.string_id].push(streams[i]);
 
-				uri_data[streams[i]] = {
-					peer: peer,
-					title: "",
-					rgb: null,
-					depth: null,
-					pose: null
-				};
+				uri_data[streams[i]] = new RGBDStream(streams[i], peer);
 			}
 		});
 	}
@@ -144,13 +208,8 @@ app.ws('/', (ws, req) => {
 	p.bind("get_stream", (uri, N, rate, pid, dest) => {
 		let peer = uri_data[uri].peer;
 		if (peer) {
-			// FIXME (NICK) BUG HERE, can't have multiple peers listening to same stream...
-			peer.bind(uri, (rgb, depth) => {
-				uri_data[uri].rgb = rgb;
-				uri_data[uri].depth = depth;
-				p.send(uri, rgb, depth);
-			});
-			peer.send("get_stream", uri, N, rate, [Peer.uuid], dest);
+			uri_data[uri].addClient(p, N, rate, dest);
+			//peer.send("get_stream", uri, N, rate, [Peer.uuid], dest);
 		}
 	});
 
@@ -159,13 +218,7 @@ app.ws('/', (ws, req) => {
 		//uri_to_peer[streams[i]] = peer;
 		peer_uris[p.string_id].push(uri);
 
-		uri_data[uri] = {
-			peer: p,
-			title: "",
-			rgb: null,
-			depth: null,
-			pose: null
-		};
+		uri_data[uri] = new RGBDStream(uri, p);
 
 		broadcastExcept(p, "add_stream", uri);
 	});
-- 
GitLab