Skip to content
Snippets Groups Projects
Commit 6c08c53c authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Fixes #77 multiple ws stream clients

parent 15dbd484
No related branches found
No related tags found
Loading
...@@ -11,6 +11,76 @@ let peer_uris = {}; ...@@ -11,6 +11,76 @@ let peer_uris = {};
let uri_data = {}; let uri_data = {};
function RGBDClient(peer, N, rate, dest) {
this.peer = peer;
this.txmax = N;
this.rate = rate;
this.dest = dest;
this.txcount = 0;
}
RGBDClient.prototype.push = function(uri, rgb, depth) {
this.peer.send(uri, rgb, depth);
this.txcount++;
}
function RGBDStream(uri, peer) {
this.uri = uri;
this.peer = peer;
this.title = "";
this.rgb = null;
this.depth = null;
this.pose = null;
this.clients = [];
this.rxcount = 10;
this.rxmax = 10;
peer.bind(uri, (rgb, depth) => {
this.pushFrames(rgb, depth);
this.rxcount++;
if (this.rxcount >= this.rxmax && this.clients.length > 0) {
this.subscribe();
}
});
}
RGBDStream.prototype.addClient = function(peer, N, rate, dest) {
// TODO(Nick) Verify that it isn't already in list...
for (let i=0; i<this.clients.length; i++) {
if (this.clients[i].peer.string_id == peer.string_id) return;
}
this.clients.push(new RGBDClient(peer, N, rate, dest));
if (this.rxcount >= this.rxmax) {
this.subscribe();
}
}
RGBDStream.prototype.subscribe = function() {
this.rxcount = 0;
this.rxmax = 10;
console.log("Subscribe to ", this.uri);
this.peer.send("get_stream", this.uri, 10, 0, [Peer.uuid], this.uri);
}
RGBDStream.prototype.pushFrames = function(rgb, depth) {
this.rgb = rgb;
this.depth = depth;
for (let i=0; i < this.clients.length; i++) {
this.clients[i].push(this.uri, rgb, depth);
}
let i=0;
while (i < this.clients.length) {
if (this.clients[i].txcount >= this.clients[i].txmax) {
console.log("remove client");
this.clients.splice(i, 1);
} else i++;
}
}
// ---- PROTOCOL --------------------------------------------------------------- // ---- PROTOCOL ---------------------------------------------------------------
app.get('/', (req, res) => { app.get('/', (req, res) => {
...@@ -49,13 +119,7 @@ function checkStreams(peer) { ...@@ -49,13 +119,7 @@ function checkStreams(peer) {
//uri_to_peer[streams[i]] = peer; //uri_to_peer[streams[i]] = peer;
peer_uris[peer.string_id].push(streams[i]); peer_uris[peer.string_id].push(streams[i]);
uri_data[streams[i]] = { uri_data[streams[i]] = new RGBDStream(streams[i], peer);
peer: peer,
title: "",
rgb: null,
depth: null,
pose: null
};
} }
}); });
} }
...@@ -144,13 +208,8 @@ app.ws('/', (ws, req) => { ...@@ -144,13 +208,8 @@ app.ws('/', (ws, req) => {
p.bind("get_stream", (uri, N, rate, pid, dest) => { p.bind("get_stream", (uri, N, rate, pid, dest) => {
let peer = uri_data[uri].peer; let peer = uri_data[uri].peer;
if (peer) { if (peer) {
// FIXME (NICK) BUG HERE, can't have multiple peers listening to same stream... uri_data[uri].addClient(p, N, rate, dest);
peer.bind(uri, (rgb, depth) => { //peer.send("get_stream", uri, N, rate, [Peer.uuid], dest);
uri_data[uri].rgb = rgb;
uri_data[uri].depth = depth;
p.send(uri, rgb, depth);
});
peer.send("get_stream", uri, N, rate, [Peer.uuid], dest);
} }
}); });
...@@ -159,13 +218,7 @@ app.ws('/', (ws, req) => { ...@@ -159,13 +218,7 @@ app.ws('/', (ws, req) => {
//uri_to_peer[streams[i]] = peer; //uri_to_peer[streams[i]] = peer;
peer_uris[p.string_id].push(uri); peer_uris[p.string_id].push(uri);
uri_data[uri] = { uri_data[uri] = new RGBDStream(uri, p);
peer: p,
title: "",
rgb: null,
depth: null,
pose: null
};
broadcastExcept(p, "add_stream", uri); broadcastExcept(p, "add_stream", uri);
}); });
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment