Skip to content
Snippets Groups Projects

Fixes #77 multiple ws stream clients

Merged Nicolas Pope requested to merge bug/77/webmulti into master
1 file
+ 74
21
Compare changes
  • Side-by-side
  • Inline
+ 74
21
@@ -11,6 +11,76 @@ let peer_uris = {};
@@ -11,6 +11,76 @@ let peer_uris = {};
let uri_data = {};
let uri_data = {};
 
function RGBDClient(peer, N, rate, dest) {
 
this.peer = peer;
 
this.txmax = N;
 
this.rate = rate;
 
this.dest = dest;
 
this.txcount = 0;
 
}
 
 
RGBDClient.prototype.push = function(uri, rgb, depth) {
 
this.peer.send(uri, rgb, depth);
 
this.txcount++;
 
}
 
 
function RGBDStream(uri, peer) {
 
this.uri = uri;
 
this.peer = peer;
 
this.title = "";
 
this.rgb = null;
 
this.depth = null;
 
this.pose = null;
 
this.clients = [];
 
this.rxcount = 10;
 
this.rxmax = 10;
 
 
peer.bind(uri, (rgb, depth) => {
 
this.pushFrames(rgb, depth);
 
this.rxcount++;
 
if (this.rxcount >= this.rxmax && this.clients.length > 0) {
 
this.subscribe();
 
}
 
});
 
}
 
 
RGBDStream.prototype.addClient = function(peer, N, rate, dest) {
 
// TODO(Nick) Verify that it isn't already in list...
 
for (let i=0; i<this.clients.length; i++) {
 
if (this.clients[i].peer.string_id == peer.string_id) return;
 
}
 
 
this.clients.push(new RGBDClient(peer, N, rate, dest));
 
 
if (this.rxcount >= this.rxmax) {
 
this.subscribe();
 
}
 
}
 
 
RGBDStream.prototype.subscribe = function() {
 
this.rxcount = 0;
 
this.rxmax = 10;
 
console.log("Subscribe to ", this.uri);
 
this.peer.send("get_stream", this.uri, 10, 0, [Peer.uuid], this.uri);
 
}
 
 
RGBDStream.prototype.pushFrames = function(rgb, depth) {
 
this.rgb = rgb;
 
this.depth = depth;
 
 
for (let i=0; i < this.clients.length; i++) {
 
this.clients[i].push(this.uri, rgb, depth);
 
}
 
 
let i=0;
 
while (i < this.clients.length) {
 
if (this.clients[i].txcount >= this.clients[i].txmax) {
 
console.log("remove client");
 
this.clients.splice(i, 1);
 
} else i++;
 
}
 
}
 
// ---- PROTOCOL ---------------------------------------------------------------
// ---- PROTOCOL ---------------------------------------------------------------
app.get('/', (req, res) => {
app.get('/', (req, res) => {
@@ -49,13 +119,7 @@ function checkStreams(peer) {
@@ -49,13 +119,7 @@ function checkStreams(peer) {
//uri_to_peer[streams[i]] = peer;
//uri_to_peer[streams[i]] = peer;
peer_uris[peer.string_id].push(streams[i]);
peer_uris[peer.string_id].push(streams[i]);
uri_data[streams[i]] = {
uri_data[streams[i]] = new RGBDStream(streams[i], peer);
peer: peer,
title: "",
rgb: null,
depth: null,
pose: null
};
}
}
});
});
}
}
@@ -144,13 +208,8 @@ app.ws('/', (ws, req) => {
@@ -144,13 +208,8 @@ app.ws('/', (ws, req) => {
p.bind("get_stream", (uri, N, rate, pid, dest) => {
p.bind("get_stream", (uri, N, rate, pid, dest) => {
let peer = uri_data[uri].peer;
let peer = uri_data[uri].peer;
if (peer) {
if (peer) {
// FIXME (NICK) BUG HERE, can't have multiple peers listening to same stream...
uri_data[uri].addClient(p, N, rate, dest);
peer.bind(uri, (rgb, depth) => {
//peer.send("get_stream", uri, N, rate, [Peer.uuid], dest);
uri_data[uri].rgb = rgb;
uri_data[uri].depth = depth;
p.send(uri, rgb, depth);
});
peer.send("get_stream", uri, N, rate, [Peer.uuid], dest);
}
}
});
});
@@ -159,13 +218,7 @@ app.ws('/', (ws, req) => {
@@ -159,13 +218,7 @@ app.ws('/', (ws, req) => {
//uri_to_peer[streams[i]] = peer;
//uri_to_peer[streams[i]] = peer;
peer_uris[p.string_id].push(uri);
peer_uris[p.string_id].push(uri);
uri_data[uri] = {
uri_data[uri] = new RGBDStream(uri, p);
peer: p,
title: "",
rgb: null,
depth: null,
pose: null
};
broadcastExcept(p, "add_stream", uri);
broadcastExcept(p, "add_stream", uri);
});
});
Loading