Skip to content
Snippets Groups Projects
Commit ab504be8 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Refactor to rgbd streams in webservice

parent 15dbd484
No related branches found
No related tags found
1 merge request!30Fixes #77 multiple ws stream clients
Pipeline #11409 passed
...@@ -11,6 +11,78 @@ let peer_uris = {}; ...@@ -11,6 +11,78 @@ let peer_uris = {};
let uri_data = {}; let uri_data = {};
function RGBDClient(peer, N, rate, dest) {
this.peer = peer;
this.txmax = N;
this.rate = rate;
this.dest = dest;
this.txcount = 0;
}
RGBDClient.prototype.push = function(uri, rgb, depth) {
this.peer.send(uri, rgb, depth);
this.txcount++;
}
function RGBDStream(uri, peer) {
this.uri = uri;
this.peer = peer;
this.title = "";
this.rgb = null;
this.depth = null;
this.pose = null;
this.clients = [];
this.rxcount = 10;
this.rxmax = 10;
peer.bind(uri, (rgb, depth) => {
this.pushFrames(rgb, depth);
this.rxcount++;
if (this.rxcount >= this.rxmax && this.clients.length > 0) {
this.subscribe();
}
});
}
RGBDStream.prototype.addClient = function(peer, N, rate, dest) {
// TODO(Nick) Verify that it isn't already in list...
for (let i=0; i<this.clients.length; i++) {
if (this.clients[i].peer.string_id == peer.string_id) return;
}
this.clients.push(new RGBDClient(peer, N, rate, dest));
if (this.rxcount >= this.rxmax) {
this.subscribe();
}
}
RGBDStream.prototype.subscribe = function() {
this.rxcount = 0;
this.rxmax = 10;
console.log("Subscribe to ", this.uri);
this.peer.send("get_stream", this.uri, 10, 0, [Peer.uuid], this.uri);
}
RGBDStream.prototype.pushFrames = function(rgb, depth) {
this.rgb = rgb;
this.depth = depth;
for (let i=0; i < this.clients.length; i++) {
console.log("Push frame");
this.clients[i].push(this.uri, rgb, depth);
}
let i=0;
while (i < this.clients.length) {
console.log("TX COUNT", this.clients[i].txmax);
if (this.clients[i].txcount >= this.clients[i].txmax) {
console.log("remove client");
this.clients.splice(i, 1);
} else i++;
}
}
// ---- PROTOCOL --------------------------------------------------------------- // ---- PROTOCOL ---------------------------------------------------------------
app.get('/', (req, res) => { app.get('/', (req, res) => {
...@@ -49,13 +121,7 @@ function checkStreams(peer) { ...@@ -49,13 +121,7 @@ function checkStreams(peer) {
//uri_to_peer[streams[i]] = peer; //uri_to_peer[streams[i]] = peer;
peer_uris[peer.string_id].push(streams[i]); peer_uris[peer.string_id].push(streams[i]);
uri_data[streams[i]] = { uri_data[streams[i]] = new RGBDStream(streams[i], peer);
peer: peer,
title: "",
rgb: null,
depth: null,
pose: null
};
} }
}); });
} }
...@@ -144,13 +210,8 @@ app.ws('/', (ws, req) => { ...@@ -144,13 +210,8 @@ app.ws('/', (ws, req) => {
p.bind("get_stream", (uri, N, rate, pid, dest) => { p.bind("get_stream", (uri, N, rate, pid, dest) => {
let peer = uri_data[uri].peer; let peer = uri_data[uri].peer;
if (peer) { if (peer) {
// FIXME (NICK) BUG HERE, can't have multiple peers listening to same stream... uri_data[uri].addClient(p, N, rate, dest);
peer.bind(uri, (rgb, depth) => { //peer.send("get_stream", uri, N, rate, [Peer.uuid], dest);
uri_data[uri].rgb = rgb;
uri_data[uri].depth = depth;
p.send(uri, rgb, depth);
});
peer.send("get_stream", uri, N, rate, [Peer.uuid], dest);
} }
}); });
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment