Skip to content
Snippets Groups Projects
Commit 9191d94c authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/wstunnel' into 'master'

Partial implementation of #71 websockets

See merge request nicolas.pope/ftl!27
parents 0c985b45 cbb97acb
Branches
Tags
1 merge request!27Partial implementation of #71 websockets
Pipeline #11320 failed
Showing
with 450 additions and 23 deletions
...@@ -9,3 +9,4 @@ build/ ...@@ -9,3 +9,4 @@ build/
lab-designs/viewer lab-designs/viewer
.vscode .vscode
doc/ doc/
web-service/npm-debug.log
...@@ -17,7 +17,20 @@ Master::Master(Configurable *root, Universe *net) ...@@ -17,7 +17,20 @@ Master::Master(Configurable *root, Universe *net)
} }
}); });
net->bind("node_details", [net,root]() -> std::vector<std::string> {
ftl::config::json_t json {
{"id", net->id().to_string()},
{"title", root->value("title", *root->get<string>("$id"))},
{"kind", "master"}
};
return {json.dump()};
});
net->broadcast("log_subscribe", net->id()); net->broadcast("log_subscribe", net->id());
net->onConnect([this](ftl::net::Peer*) {
net_->broadcast("log_subscribe", net_->id());
});
} }
Master::~Master() { Master::~Master() {
... ...
......
...@@ -35,10 +35,11 @@ Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false) ...@@ -35,10 +35,11 @@ Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false)
return ftl::config::resolve(uri); return ftl::config::resolve(uri);
}); });
net->bind("slave_details", [net,root]() -> std::vector<std::string> { net->bind("node_details", [net,root]() -> std::vector<std::string> {
ftl::config::json_t json { ftl::config::json_t json {
{"id", net->id().to_string()}, {"id", net->id().to_string()},
{"title", root->value("title", *root->get<string>("$id"))} {"title", root->value("title", *root->get<string>("$id"))},
{"kind", "slave"}
}; };
return {json.dump()}; return {json.dump()};
}); });
...@@ -48,6 +49,10 @@ Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false) ...@@ -48,6 +49,10 @@ Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false)
log_peers_.push_back(peer); log_peers_.push_back(peer);
}); });
net->bind("connect", [this](const std::string &url) {
net_->connect(url);
});
loguru::add_callback("net_log", netLog, this, loguru::Verbosity_INFO); loguru::add_callback("net_log", netLog, this, loguru::Verbosity_INFO);
} }
... ...
......
...@@ -243,6 +243,7 @@ class Peer { ...@@ -243,6 +243,7 @@ class Peer {
bool is_waiting_; bool is_waiting_;
msgpack::unpacker recv_buf_; msgpack::unpacker recv_buf_;
std::recursive_mutex recv_mtx_; std::recursive_mutex recv_mtx_;
bool ws_read_header_;
// Send buffers // Send buffers
msgpack::vrefbuffer send_buf_; msgpack::vrefbuffer send_buf_;
...@@ -317,7 +318,7 @@ int Peer::asyncCall( ...@@ -317,7 +318,7 @@ int Peer::asyncCall(
auto args_obj = std::make_tuple(args...); auto args_obj = std::make_tuple(args...);
auto rpcid = 0; auto rpcid = 0;
DLOG(1) << "RPC " << name << "() -> " << uri_; LOG(INFO) << "RPC " << name << "() -> " << uri_;
{ {
std::unique_lock<std::recursive_mutex> lk(recv_mtx_); std::unique_lock<std::recursive_mutex> lk(recv_mtx_);
...@@ -329,6 +330,7 @@ int Peer::asyncCall( ...@@ -329,6 +330,7 @@ int Peer::asyncCall(
auto call_obj = std::make_tuple(0,rpcid,name,args_obj); auto call_obj = std::make_tuple(0,rpcid,name,args_obj);
std::unique_lock<std::recursive_mutex> lk(send_mtx_); std::unique_lock<std::recursive_mutex> lk(send_mtx_);
if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0);
msgpack::pack(send_buf_, call_obj); msgpack::pack(send_buf_, call_obj);
_send(); _send();
return rpcid; return rpcid;
... ...
......
...@@ -10,7 +10,7 @@ namespace net { ...@@ -10,7 +10,7 @@ namespace net {
typedef std::tuple<uint64_t, uint32_t, ftl::UUID> Handshake; typedef std::tuple<uint64_t, uint32_t, ftl::UUID> Handshake;
static const uint64_t kMagic = 0x1099340053640912; static const uint64_t kMagic = 0x0009340053640912;
static const uint32_t kVersion = (FTL_VERSION_MAJOR << 16) + static const uint32_t kVersion = (FTL_VERSION_MAJOR << 16) +
(FTL_VERSION_MINOR << 8) + FTL_VERSION_PATCH; (FTL_VERSION_MINOR << 8) + FTL_VERSION_PATCH;
... ...
......
...@@ -327,7 +327,9 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { ...@@ -327,7 +327,9 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) {
std::map<Peer*, int> record; std::map<Peer*, int> record;
std::shared_lock<std::shared_mutex> lk(net_mutex_); std::shared_lock<std::shared_mutex> lk(net_mutex_);
for (auto p : peers_) { for (auto p : peers_) {
if (!p->isConnected()) continue; if (!p->isConnected()) {
continue;
}
sentcount++; sentcount++;
record[p] = p->asyncCall<std::vector<R>>(name, handler, args...); record[p] = p->asyncCall<std::vector<R>>(name, handler, args...);
} }
... ...
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <cstddef> #include <cstddef>
#include <functional> #include <functional>
#include <ftl/uri.hpp> #include <ftl/uri.hpp>
#include <msgpack.hpp>
#include <ftl/net/common.hpp> #include <ftl/net/common.hpp>
...@@ -40,6 +41,8 @@ struct wsheader_type { ...@@ -40,6 +41,8 @@ struct wsheader_type {
*/ */
int ws_dispatch(const char *data, size_t len, std::function<void(const wsheader_type&,const char*,size_t)> d); int ws_dispatch(const char *data, size_t len, std::function<void(const wsheader_type&,const char*,size_t)> d);
int ws_parse(msgpack::unpacker &buf, wsheader_type &ws);
/** /**
* Websocket header constructor. Fills a buffer with the correct websocket * Websocket header constructor. Fills a buffer with the correct websocket
* header for a given opcode, mask setting and message length. * header for a given opcode, mask setting and message length.
... ...
......
...@@ -129,15 +129,17 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const ...@@ -129,15 +129,17 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const
auto &&name = std::get<1>(the_call); auto &&name = std::get<1>(the_call);
auto &&args = std::get<2>(the_call); auto &&args = std::get<2>(the_call);
// LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI();
auto binding = _locateHandler(name); auto binding = _locateHandler(name);
//LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI();
if (binding) { if (binding) {
try { try {
auto result = (*binding)(args); auto result = (*binding)(args);
} catch (const int &e) { } catch (const int &e) {
LOG(ERROR) << "Exception in bound function";
throw &e; throw &e;
} catch (const std::exception &e) {
LOG(ERROR) << "Exception for '" << name << "' - " << e.what();
} }
} else { } else {
LOG(ERROR) << "Missing handler for incoming message (" << name << ")"; LOG(ERROR) << "Missing handler for incoming message (" << name << ")";
... ...
......
...@@ -154,7 +154,7 @@ static SOCKET tcpConnect(URI &uri) { ...@@ -154,7 +154,7 @@ static SOCKET tcpConnect(URI &uri) {
return csocket; return csocket;
} }
Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(false), universe_(u) { Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(false), universe_(u), ws_read_header_(false) {
status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting; status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting;
_updateURI(); _updateURI();
...@@ -196,7 +196,7 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals ...@@ -196,7 +196,7 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals
} }
} }
Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), universe_(u), uri_(pUri) { Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), universe_(u), ws_read_header_(false), uri_(pUri) {
URI uri(pUri); URI uri(pUri);
status_ = kInvalid; status_ = kInvalid;
...@@ -219,6 +219,9 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), ...@@ -219,6 +219,9 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true),
if (!ws_connect(sock_, uri)) { if (!ws_connect(sock_, uri)) {
LOG(ERROR) << "Websocket connection failed"; LOG(ERROR) << "Websocket connection failed";
_badClose(false); _badClose(false);
} else {
status_ = kConnecting;
LOG(INFO) << "WEB SOCK CONNECTED";
} }
} else { } else {
LOG(ERROR) << "Connection refused to " << uri.getHost() << ":" << uri.getPort(); LOG(ERROR) << "Connection refused to " << uri.getHost() << ":" << uri.getPort();
...@@ -374,20 +377,44 @@ void Peer::data() { ...@@ -374,20 +377,44 @@ void Peer::data() {
}, this); }, this);
} }
inline std::ostream& hex_dump(std::ostream& o, std::string const& v) {
std::ios::fmtflags f(o.flags());
o << std::hex;
for (auto c : v) {
o << "0x" << std::setw(2) << std::setfill('0') << (static_cast<int>(c) & 0xff) << ' ';
}
o.flags(f);
return o;
}
bool Peer::_data() { bool Peer::_data() {
std::unique_lock<std::recursive_mutex> lk(recv_mtx_); std::unique_lock<std::recursive_mutex> lk(recv_mtx_);
recv_buf_.reserve_buffer(kMaxMessage); recv_buf_.reserve_buffer(kMaxMessage);
int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0);
if (rc < 0) { if (rc <= 0) {
return false; return false;
} }
recv_buf_.buffer_consumed(rc); recv_buf_.buffer_consumed(rc);
if (scheme_ == ftl::URI::SCHEME_WS && !ws_read_header_) {
wsheader_type ws;
if (ws_parse(recv_buf_, ws) < 0) {
return false;
}
ws_read_header_ = true;
}
/*if (rc > 0) {
hex_dump(std::cout, std::string((char*)recv_buf_.nonparsed_buffer(), recv_buf_.nonparsed_size()));
std::cout << std::endl;
}*/
msgpack::object_handle msg; msgpack::object_handle msg;
while (recv_buf_.next(msg)) { while (recv_buf_.next(msg)) {
ws_read_header_ = false;
msgpack::object obj = msg.get(); msgpack::object obj = msg.get();
if (status_ != kConnected) { if (status_ != kConnected) {
// First message must be a handshake // First message must be a handshake
...@@ -407,6 +434,14 @@ bool Peer::_data() { ...@@ -407,6 +434,14 @@ bool Peer::_data() {
} }
} }
disp_->dispatch(*this, obj); disp_->dispatch(*this, obj);
if (recv_buf_.nonparsed_size() > 0 && scheme_ == ftl::URI::SCHEME_WS) {
wsheader_type ws;
if (ws_parse(recv_buf_, ws) < 0) {
return false;
}
ws_read_header_ = true;
}
} }
return false; return false;
} }
...@@ -434,6 +469,7 @@ void Peer::cancelCall(int id) { ...@@ -434,6 +469,7 @@ void Peer::cancelCall(int id) {
void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res); Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res);
std::unique_lock<std::recursive_mutex> lk(send_mtx_); std::unique_lock<std::recursive_mutex> lk(send_mtx_);
if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0);
msgpack::pack(send_buf_, res_obj); msgpack::pack(send_buf_, res_obj);
_send(); _send();
} }
...@@ -481,10 +517,12 @@ int Peer::_send() { ...@@ -481,10 +517,12 @@ int Peer::_send() {
char buf[20]; // TODO(nick) Should not be a stack buffer. char buf[20]; // TODO(nick) Should not be a stack buffer.
// Calculate total size of message // Calculate total size of message
for (size_t i=0; i < size; i++) { for (size_t i=1; i < size; i++) {
len += sendvec[i].iov_len; len += sendvec[i].iov_len;
} }
//LOG(INFO) << "SEND SIZE = " << len;
// Pack correct websocket header into buffer // Pack correct websocket header into buffer
int rc = ws_prepare(wsheader_type::BINARY_FRAME, false, len, buf, 20); int rc = ws_prepare(wsheader_type::BINARY_FRAME, false, len, buf, 20);
if (rc == -1) return -1; if (rc == -1) return -1;
... ...
......
...@@ -76,6 +76,61 @@ int ftl::net::ws_dispatch(const char *data, size_t len, std::function<void(const ...@@ -76,6 +76,61 @@ int ftl::net::ws_dispatch(const char *data, size_t len, std::function<void(const
return (int)(ws.header_size+ws.N); return (int)(ws.header_size+ws.N);
} }
/* Taken from easywsclient. */
int ftl::net::ws_parse(msgpack::unpacker &buf, wsheader_type &ws) {
//wsheader_type ws;
unsigned char *data = (unsigned char*)(buf.nonparsed_buffer());
auto len = buf.nonparsed_size();
if (len < 2) return -1;
ws.fin = (data[0] & 0x80) == 0x80;
ws.opcode = (wsheader_type::opcode_type) (data[0] & 0x0f);
ws.mask = (data[1] & 0x80) == 0x80;
ws.N0 = (data[1] & 0x7f);
ws.header_size = 2 + (ws.N0 == 126? 2 : 0) + (ws.N0 == 127? 8 : 0) + (ws.mask? 4 : 0);
if (len < ws.header_size) return -1;
int i = 0;
if (ws.N0 < 126) {
ws.N = ws.N0;
i = 2;
} else if (ws.N0 == 126) {
ws.N = 0;
ws.N |= ((uint64_t) data[2]) << 8;
ws.N |= ((uint64_t) data[3]) << 0;
i = 4;
} else if (ws.N0 == 127) {
ws.N = 0;
ws.N |= ((uint64_t) data[2]) << 56;
ws.N |= ((uint64_t) data[3]) << 48;
ws.N |= ((uint64_t) data[4]) << 40;
ws.N |= ((uint64_t) data[5]) << 32;
ws.N |= ((uint64_t) data[6]) << 24;
ws.N |= ((uint64_t) data[7]) << 16;
ws.N |= ((uint64_t) data[8]) << 8;
ws.N |= ((uint64_t) data[9]) << 0;
i = 10;
}
if (ws.mask) {
ws.masking_key[0] = ((uint8_t) data[i+0]) << 0;
ws.masking_key[1] = ((uint8_t) data[i+1]) << 0;
ws.masking_key[2] = ((uint8_t) data[i+2]) << 0;
ws.masking_key[3] = ((uint8_t) data[i+3]) << 0;
} else {
ws.masking_key[0] = 0;
ws.masking_key[1] = 0;
ws.masking_key[2] = 0;
ws.masking_key[3] = 0;
}
//if (len < ws.header_size+ws.N) return -1;
buf.skip_nonparsed_buffer(ws.header_size);
return (int)(ws.header_size+ws.N);
}
int ftl::net::ws_prepare(wsheader_type::opcode_type op, bool useMask, size_t len, char *data, size_t maxlen) { int ftl::net::ws_prepare(wsheader_type::opcode_type op, bool useMask, size_t len, char *data, size_t maxlen) {
// TODO: // TODO:
// Masking key should (must) be derived from a high quality random // Masking key should (must) be derived from a high quality random
... ...
......
...@@ -109,6 +109,8 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID ...@@ -109,6 +109,8 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID
if (rate < 0 || rate >= 10) return; if (rate < 0 || rate >= 10) return;
if (N < 0 || N > ftl::rgbd::kMaxFrames) return; if (N < 0 || N > ftl::rgbd::kMaxFrames) return;
LOG(INFO) << "Adding Stream Peer: " << peer.to_string();
StreamClient c; StreamClient c;
c.peerid = peer; c.peerid = peer;
c.uri = dest; c.uri = dest;
... ...
......
...@@ -205,5 +205,11 @@ ...@@ -205,5 +205,11 @@
"peers": ["tcp://localhost:9001", "tcp://localhost:9002"] "peers": ["tcp://localhost:9001", "tcp://localhost:9002"]
}, },
"source": {"uri": "ftl://utu.fi#vision_default/source"} "source": {"uri": "ftl://utu.fi#vision_default/source"}
},
"gui_web": {
"net": {
"peers": ["ws://localhost:8080/"]
}
} }
} }
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
"license": "ISC", "license": "ISC",
"dependencies": { "dependencies": {
"express": "^4.16.4", "express": "^4.16.4",
"express-ws": "^4.0.0" "express-ws": "^4.0.0",
"msgpack5": "^4.2.1"
} }
} }
const express = require('express'); const express = require('express');
const app = express(); const app = express();
const expressWs = require('express-ws')(app); const expressWs = require('express-ws')(app);
const Peer = require('./peer.js');
// ---- INDEXES ----------------------------------------------------------------
let peer_by_id = {};
//let uri_to_peer = {};
let peer_uris = {};
let uri_data = {};
// ---- PROTOCOL ---------------------------------------------------------------
app.get('/', (req, res) => { app.get('/', (req, res) => {
res.end(); res.end();
}); });
app.get('/streams', (req, res) => {
res.json(Object.keys(uri_data));
});
app.get('/stream/rgb', (req, res) => {
let uri = req.query.uri;
if (uri_data.hasOwnProperty(uri)) {
res.writeHead(200, {'Content-Type': 'image/jpeg'});
res.end(uri_data[uri].rgb);
}
res.end();
});
app.get('/stream/depth', (req, res) => {
let uri = req.query.uri;
if (uri_data.hasOwnProperty(uri)) {
res.writeHead(200, {'Content-Type': 'image/png'});
res.end(uri_data[uri].depth);
}
res.end();
});
//app.get('/stream', (req, res))
app.ws('/', (ws, req) => { app.ws('/', (ws, req) => {
console.log("New web socket request"); console.log("New web socket request");
// SEND Handshake
ws.on('message', (msg) => { let p = new Peer(ws);
console.log("Message", msg);
p.on("connect", (peer) => {
peer.rpc("node_details", (details) => {
let obj = JSON.parse(details[0]);
peer.uri = obj.id;
peer.name = obj.title;
peer.master = (obj.kind == "master");
console.log("Peer name = ", peer.name);
peer_uris[peer.string_id] = [];
peer_by_id[peer.string_id] = peer;
if (!peer.master) {
peer.rpc("list_streams", (streams) => {
console.log("STREAMS", streams);
for (let i=0; i<streams.length; i++) {
//uri_to_peer[streams[i]] = peer;
peer_uris[peer.string_id].push(streams[i]);
uri_data[streams[i]] = {
peer: peer,
title: "",
rgb: null,
depth: null,
pose: null
};
}
});
}
});
});
p.on("disconnect", (peer) => {
console.log("DISCONNECT");
// Remove all peer details and streams....
let puris = peer_uris[peer.string_id];
if (puris) {
for (let i=0; i<puris.length; i++) {
console.log("Removing stream: ", puris[i]);
//delete uri_to_peer[puris[i]];
delete uri_data[puris[i]];
}
delete peer_uris[peer.string_id];
}
if (peer_by_id.hasOwnProperty(peer.string_id)) delete peer_by_id[peer.string_id];
});
p.bind("list_streams", () => {
return Object.keys(uri_data);
});
p.bind("find_stream", (uri) => {
if (uri_data.hasOwnProperty(uri)) {
return [Peer.uuid];
} else {
return null; // or []??
}
});
p.proxy("source_calibration", (cb, uri) => {
let peer = uri_data[uri].peer;
if (peer) {
peer.rpc("source_calibration", cb, uri);
}
});
p.bind("set_pose", (uri, vec) => {
//console.log("SET POSE");
let peer = uri_data[uri].peer;
if (peer) {
uri_data[uri].pose = vec;
peer.send("set_pose", uri, vec);
}
}); });
ws.on('error', () => {
console.log("Error"); p.bind("get_stream", (uri, N, rate, pid, dest) => {
let peer = uri_data[uri].peer;
if (peer) {
peer.bind(uri, (rgb, depth) => {
uri_data[uri].rgb = rgb;
uri_data[uri].depth = depth;
p.send(uri, rgb, depth);
}); });
ws.on('close', () => { peer.send("get_stream", uri, N, rate, [Peer.uuid], dest);
console.log("Close"); }
}); });
}); });
console.log("Listening or port 3000"); console.log("Listening or port 8080");
app.listen(3000); app.listen(8080);
const msgpack = require('msgpack5')()
, encode = msgpack.encode
, decode = msgpack.decode;
const kConnecting = 1;
const kConnected = 2;
const kDisconnected = 3;
let my_uuid = new Uint8Array(16);
my_uuid[0] = 44;
my_uuid = Buffer.from(my_uuid);
const kMagic = 0x0009340053640912;
const kVersion = 0;
function Peer(ws) {
this.sock = ws;
this.status = kConnecting;
this.id = null;
this.string_id = "";
this.bindings = {};
this.proxies = {};
this.events = {};
this.callbacks = {};
this.cbid = 0;
this.uri = "unknown";
this.name = "unknown";
this.master = false;
this.sock.on("message", (raw) => {
let msg = decode(raw);
if (this.status == kConnecting) {
if (msg[1] != "__handshake__") {
console.log("Bad handshake");
this.close();
}
}
//console.log("MSG", msg);
if (msg[0] == 0) {
// Notification
if (msg.length == 3) {
this._dispatchNotification(msg[1], msg[2]);
// Call
} else {
this._dispatchCall(msg[2], msg[1], msg[3]);
}
} else if (msg[0] == 1) {
this._dispatchResponse(msg[1], msg[3]);
}
});
this.sock.on("close", () => {
this.status = kDisconnected;
this._notify("disconnect", this);
});
this.sock.on("error", () => {
console.error("Socket error");
this.sock.close();
this.status = kDisconnected;
});
this.bind("__handshake__", (magic, version, id) => {
if (magic == kMagic) {
console.log("Handshake received");
this.status = kConnected;
this.id = id.buffer;
this.string_id = id.toString('hex');
this._notify("connect", this);
} else {
console.log("Magic does not match");
this.close();
}
});
this.send("__handshake__", kMagic, kVersion, [my_uuid]);
}
Peer.uuid = my_uuid;
Peer.prototype._dispatchNotification = function(name, args) {
if (this.bindings.hasOwnProperty(name)) {
this.bindings[name].apply(this, args);
} else {
console.log("Missing handler for: ", name);
}
}
Peer.prototype._dispatchCall = function(name, id, args) {
if (this.bindings.hasOwnProperty(name)) {
console.log("Call for:", name, id);
let res = this.bindings[name].apply(this, args);
try {
this.sock.send(encode([1,id,name,res]));
} catch(e) {
this.close();
}
} else if (this.proxies.hasOwnProperty(name)) {
console.log("Proxy for:", name, id);
args.unshift((res) => {
try {
this.sock.send(encode([1,id,name,res]));
} catch(e) {
this.close();
}
});
this.proxies[name].apply(this, args);
} else {
console.log("Missing handler for: ", name);
}
}
Peer.prototype._dispatchResponse = function(id, res) {
if (this.callbacks.hasOwnProperty(id)) {
this.callbacks[id].call(this, res);
delete this.callbacks[id];
} else {
console.log("Missing callback");
}
}
Peer.prototype.bind = function(name, f) {
if (this.bindings.hasOwnProperty(name)) {
//console.error("Duplicate bind to same procedure");
this.bindings[name] = f;
} else {
this.bindings[name] = f;
}
}
Peer.prototype.proxy = function(name, f) {
if (this.proxies.hasOwnProperty(name)) {
//console.error("Duplicate proxy to same procedure");
this.proxies[name] = f;
} else {
this.proxies[name] = f;
}
}
Peer.prototype.rpc = function(name, cb, ...args) {
let id = this.cbid++;
this.callbacks[id] = cb;
try {
this.sock.send(encode([0, id, name, args]));
} catch(e) {
this.close();
}
}
Peer.prototype.send = function(name, ...args) {
try {
this.sock.send(encode([0, name, args]));
} catch(e) {
this.close();
}
}
Peer.prototype.close = function() {
this.sock.close();
this.status = kDisconnected;
}
Peer.prototype._notify = function(evt, ...args) {
if (this.events.hasOwnProperty(evt)) {
for (let i=0; i<this.events[evt].length; i++) {
let f = this.events[evt][i];
f.apply(this, args);
}
}
}
Peer.prototype.on = function(evt, f) {
if (!this.events.hasOwnProperty(evt)) {
this.events[evt] = [];
}
this.events[evt].push(f);
}
module.exports = Peer;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment