diff --git a/web-service/src/index.js b/web-service/src/index.js index 1971f340dfa75ab34d9f8efc6968b7b38cffa68f..c6b37247ac0c0e63ad0eefbd8b81ad5594d6b432 100644 --- a/web-service/src/index.js +++ b/web-service/src/index.js @@ -11,31 +11,55 @@ let peer_uris = {}; let uri_data = {}; +/** + * A client stream request object. Each source maintains a list of clients who + * are wanting frames from that source. Clients can only request N frames at a + * time, after that if no new request is received then the client is removed. + * + * @param {Peer} peer Peer websocket wrapper + * @param {number} N Number of frames requested + * @param {number} rate Bitrate index requested + * @param {string} dest URI destination + */ function RGBDClient(peer, N, rate, dest) { this.peer = peer; - this.txmax = N*16; + this.txmax = N*16; // 16 is for 16 blocks per frame... this will change in the near future this.rate = rate; this.dest = dest; this.txcount = 0; } +/** + * Actually send a frame over network to the client. + */ RGBDClient.prototype.push = function(uri, frame, ttime, chunk, rgb, depth) { this.peer.send(uri, frame, ttime, chunk, rgb, depth); this.txcount++; } +/** + * A video stream. Each peer provides a list of these streams. Each stream can + * receive frames from the source and forward those frames to any clients. + * Therefore each of these stream objects maintains a list of clients and + * loops over them whenever a new frame is received. + * + * @param {string} uri Address of the stream + * @param {Peer} peer Origin of the stream + */ function RGBDStream(uri, peer) { this.uri = uri; this.peer = peer; this.title = ""; - this.rgb = null; - this.depth = null; + this.rgb = null; // TODO: No longer works as an image + this.depth = null; // TODO: No longer works as an image this.pose = null; this.clients = []; this.rxcount = 10; this.rxmax = 10; + // Add RPC handler to receive frames from the source peer.bind(uri, (frame, ttime, chunk, rgb, depth) => { + // Forward frames to all clients this.pushFrames(frame, ttime, chunk, rgb, depth); this.rxcount++; if (this.rxcount >= this.rxmax && this.clients.length > 0) { @@ -176,6 +200,7 @@ app.ws('/', (ws, req) => { checkStreams(p); }); + // Used to sync clocks p.bind("__ping__", () => { return Date.now(); }); @@ -198,6 +223,7 @@ app.ws('/', (ws, req) => { } }); + // Requests camera calibration information p.proxy("source_details", (cb, uri, chan) => { let peer = uri_data[uri].peer; if (peer) { @@ -205,6 +231,7 @@ app.ws('/', (ws, req) => { } }); + // Get the current position of a camera p.proxy("get_pose", (cb, uri) => { //console.log("SET POSE"); let peer = uri_data[uri].peer; @@ -213,6 +240,7 @@ app.ws('/', (ws, req) => { } }); + // Change the position of a camera p.bind("set_pose", (uri, vec) => { let peer = uri_data[uri].peer; if (peer) { @@ -221,6 +249,7 @@ app.ws('/', (ws, req) => { } }); + // Request from frames from a source p.bind("get_stream", (uri, N, rate, pid, dest) => { let peer = uri_data[uri].peer; if (peer) { @@ -229,6 +258,7 @@ app.ws('/', (ws, req) => { } }); + // Register a new stream p.bind("add_stream", (uri) => { console.log("Adding stream: ", uri); //uri_to_peer[streams[i]] = peer; diff --git a/web-service/src/peer.js b/web-service/src/peer.js index 26a65782bdeeea5aaa295e9b15f1e7735958ef89..51cd78e6a9ad07ec17f478646b3b60171e6280bc 100644 --- a/web-service/src/peer.js +++ b/web-service/src/peer.js @@ -6,6 +6,7 @@ const kConnecting = 1; const kConnected = 2; const kDisconnected = 3; +// Generate a unique id for this webservice let my_uuid = new Uint8Array(16); my_uuid[0] = 44; my_uuid = Buffer.from(my_uuid); @@ -13,6 +14,10 @@ my_uuid = Buffer.from(my_uuid); const kMagic = 0x0009340053640912; const kVersion = 0; +/** + * Wrap a web socket with a MsgPack RCP protocol that works with our C++ version. + * @param {websocket} ws Websocket object + */ function Peer(ws) { this.sock = ws; this.status = kConnecting; @@ -79,6 +84,9 @@ function Peer(ws) { Peer.uuid = my_uuid; +/** + * @private + */ Peer.prototype._dispatchNotification = function(name, args) { if (this.bindings.hasOwnProperty(name)) { //console.log("Notification for: ", name); @@ -88,6 +96,9 @@ Peer.prototype._dispatchNotification = function(name, args) { } } +/** + * @private + */ Peer.prototype._dispatchCall = function(name, id, args) { if (this.bindings.hasOwnProperty(name)) { //console.log("Call for:", name, id); @@ -114,6 +125,9 @@ Peer.prototype._dispatchCall = function(name, id, args) { } } +/** + * @private + */ Peer.prototype._dispatchResponse = function(id, res) { if (this.callbacks.hasOwnProperty(id)) { this.callbacks[id].call(this, res); @@ -123,6 +137,14 @@ Peer.prototype._dispatchResponse = function(id, res) { } } +/** + * Register an RPC handler that will be called from a remote machine. Remotely + * passed arguments are provided to the given function as normal arguments, and + * if the function returns a value, it will be returned over the network also. + * + * @param {string} name The name of the function + * @param {function} f A function or lambda to be callable remotely + */ Peer.prototype.bind = function(name, f) { if (this.bindings.hasOwnProperty(name)) { //console.error("Duplicate bind to same procedure"); @@ -132,6 +154,10 @@ Peer.prototype.bind = function(name, f) { } } +/** + * Allow an RPC call to pass through to another machine with minimal local + * processing. + */ Peer.prototype.proxy = function(name, f) { if (this.proxies.hasOwnProperty(name)) { //console.error("Duplicate proxy to same procedure"); @@ -141,6 +167,13 @@ Peer.prototype.proxy = function(name, f) { } } +/** + * Call a procedure on a remote machine. + * + * @param {string} name Name of the procedure + * @param {function} cb Callback to receive return value as argument + * @param {...} args Any number of arguments to also pass to remote procedure + */ Peer.prototype.rpc = function(name, cb, ...args) { let id = this.cbid++; this.callbacks[id] = cb; @@ -160,6 +193,12 @@ Peer.prototype.sendB = function(name, args) { } } +/** + * Call a remote procedure but with no return value expected. + * + * @param {string} name Name of the procedure + * @param {...} args Any number of arguments to also pass to remote procedure + */ Peer.prototype.send = function(name, ...args) { try { this.sock.send(encode([0, name, args])); @@ -173,6 +212,9 @@ Peer.prototype.close = function() { this.status = kDisconnected; } +/** + * @private + */ Peer.prototype._notify = function(evt, ...args) { if (this.events.hasOwnProperty(evt)) { for (let i=0; i<this.events[evt].length; i++) { @@ -182,6 +224,13 @@ Peer.prototype._notify = function(evt, ...args) { } } +/** + * Register a callback for socket events. Events include: 'connect', + * 'disconnect' and 'error'. + * + * @param {string} evt Event name + * @param {function} f Callback on event + */ Peer.prototype.on = function(evt, f) { if (!this.events.hasOwnProperty(evt)) { this.events[evt] = [];