diff --git a/web-service/src/index.js b/web-service/src/index.js index 8fbba0ad66f458b6ad98e91c17364ae607553e6b..f4096ecdcb17b32510a3baa6e74245771a49e02f 100644 --- a/web-service/src/index.js +++ b/web-service/src/index.js @@ -11,6 +11,76 @@ let peer_uris = {}; let uri_data = {}; +function RGBDClient(peer, N, rate, dest) { + this.peer = peer; + this.txmax = N; + this.rate = rate; + this.dest = dest; + this.txcount = 0; +} + +RGBDClient.prototype.push = function(uri, rgb, depth) { + this.peer.send(uri, rgb, depth); + this.txcount++; +} + +function RGBDStream(uri, peer) { + this.uri = uri; + this.peer = peer; + this.title = ""; + this.rgb = null; + this.depth = null; + this.pose = null; + this.clients = []; + this.rxcount = 10; + this.rxmax = 10; + + peer.bind(uri, (rgb, depth) => { + this.pushFrames(rgb, depth); + this.rxcount++; + if (this.rxcount >= this.rxmax && this.clients.length > 0) { + this.subscribe(); + } + }); +} + +RGBDStream.prototype.addClient = function(peer, N, rate, dest) { + // TODO(Nick) Verify that it isn't already in list... + for (let i=0; i<this.clients.length; i++) { + if (this.clients[i].peer.string_id == peer.string_id) return; + } + + this.clients.push(new RGBDClient(peer, N, rate, dest)); + + if (this.rxcount >= this.rxmax) { + this.subscribe(); + } +} + +RGBDStream.prototype.subscribe = function() { + this.rxcount = 0; + this.rxmax = 10; + console.log("Subscribe to ", this.uri); + this.peer.send("get_stream", this.uri, 10, 0, [Peer.uuid], this.uri); +} + +RGBDStream.prototype.pushFrames = function(rgb, depth) { + this.rgb = rgb; + this.depth = depth; + + for (let i=0; i < this.clients.length; i++) { + this.clients[i].push(this.uri, rgb, depth); + } + + let i=0; + while (i < this.clients.length) { + if (this.clients[i].txcount >= this.clients[i].txmax) { + console.log("remove client"); + this.clients.splice(i, 1); + } else i++; + } +} + // ---- PROTOCOL --------------------------------------------------------------- app.get('/', (req, res) => { @@ -49,13 +119,7 @@ function checkStreams(peer) { //uri_to_peer[streams[i]] = peer; peer_uris[peer.string_id].push(streams[i]); - uri_data[streams[i]] = { - peer: peer, - title: "", - rgb: null, - depth: null, - pose: null - }; + uri_data[streams[i]] = new RGBDStream(streams[i], peer); } }); } @@ -144,13 +208,8 @@ app.ws('/', (ws, req) => { p.bind("get_stream", (uri, N, rate, pid, dest) => { let peer = uri_data[uri].peer; if (peer) { - // FIXME (NICK) BUG HERE, can't have multiple peers listening to same stream... - peer.bind(uri, (rgb, depth) => { - uri_data[uri].rgb = rgb; - uri_data[uri].depth = depth; - p.send(uri, rgb, depth); - }); - peer.send("get_stream", uri, N, rate, [Peer.uuid], dest); + uri_data[uri].addClient(p, N, rate, dest); + //peer.send("get_stream", uri, N, rate, [Peer.uuid], dest); } }); @@ -159,13 +218,7 @@ app.ws('/', (ws, req) => { //uri_to_peer[streams[i]] = peer; peer_uris[p.string_id].push(uri); - uri_data[uri] = { - peer: p, - title: "", - rgb: null, - depth: null, - pose: null - }; + uri_data[uri] = new RGBDStream(uri, p); broadcastExcept(p, "add_stream", uri); });