From cbb97acb0afd82fd7bcc641b9ccc815aede7bcd1 Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nicolas.pope@utu.fi> Date: Tue, 11 Jun 2019 13:36:55 +0300 Subject: [PATCH] Partial implementation of #71 websockets --- .gitignore | 1 + components/control/cpp/src/master.cpp | 13 ++ components/control/cpp/src/slave.cpp | 9 +- components/net/cpp/include/ftl/net/peer.hpp | 4 +- .../net/cpp/include/ftl/net/protocol.hpp | 2 +- .../net/cpp/include/ftl/net/universe.hpp | 4 +- .../net/cpp/include/ftl/net/ws_internal.hpp | 3 + components/net/cpp/src/dispatcher.cpp | 6 +- components/net/cpp/src/peer.cpp | 48 ++++- components/net/cpp/src/ws_internal.cpp | 57 +++++- components/rgbd-sources/src/streamer.cpp | 2 + config/config.jsonc | 6 + web-service/package.json | 3 +- web-service/src/index.js | 133 ++++++++++++- web-service/src/peer.js | 182 ++++++++++++++++++ 15 files changed, 450 insertions(+), 23 deletions(-) create mode 100644 web-service/src/peer.js diff --git a/.gitignore b/.gitignore index 0cad48ae0..8450d6e9c 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ build/ lab-designs/viewer .vscode doc/ +web-service/npm-debug.log diff --git a/components/control/cpp/src/master.cpp b/components/control/cpp/src/master.cpp index 7bd1f753a..36108610f 100644 --- a/components/control/cpp/src/master.cpp +++ b/components/control/cpp/src/master.cpp @@ -17,7 +17,20 @@ Master::Master(Configurable *root, Universe *net) } }); + net->bind("node_details", [net,root]() -> std::vector<std::string> { + ftl::config::json_t json { + {"id", net->id().to_string()}, + {"title", root->value("title", *root->get<string>("$id"))}, + {"kind", "master"} + }; + return {json.dump()}; + }); + net->broadcast("log_subscribe", net->id()); + + net->onConnect([this](ftl::net::Peer*) { + net_->broadcast("log_subscribe", net_->id()); + }); } Master::~Master() { diff --git a/components/control/cpp/src/slave.cpp b/components/control/cpp/src/slave.cpp index 583c16ad4..118a68265 100644 --- a/components/control/cpp/src/slave.cpp +++ b/components/control/cpp/src/slave.cpp @@ -35,10 +35,11 @@ Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false) return ftl::config::resolve(uri); }); - net->bind("slave_details", [net,root]() -> std::vector<std::string> { + net->bind("node_details", [net,root]() -> std::vector<std::string> { ftl::config::json_t json { {"id", net->id().to_string()}, - {"title", root->value("title", *root->get<string>("$id"))} + {"title", root->value("title", *root->get<string>("$id"))}, + {"kind", "slave"} }; return {json.dump()}; }); @@ -48,6 +49,10 @@ Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false) log_peers_.push_back(peer); }); + net->bind("connect", [this](const std::string &url) { + net_->connect(url); + }); + loguru::add_callback("net_log", netLog, this, loguru::Verbosity_INFO); } diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index fd89e08fe..b086f8d35 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -243,6 +243,7 @@ class Peer { bool is_waiting_; msgpack::unpacker recv_buf_; std::recursive_mutex recv_mtx_; + bool ws_read_header_; // Send buffers msgpack::vrefbuffer send_buf_; @@ -317,7 +318,7 @@ int Peer::asyncCall( auto args_obj = std::make_tuple(args...); auto rpcid = 0; - DLOG(1) << "RPC " << name << "() -> " << uri_; + LOG(INFO) << "RPC " << name << "() -> " << uri_; { std::unique_lock<std::recursive_mutex> lk(recv_mtx_); @@ -329,6 +330,7 @@ int Peer::asyncCall( auto call_obj = std::make_tuple(0,rpcid,name,args_obj); std::unique_lock<std::recursive_mutex> lk(send_mtx_); + if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); msgpack::pack(send_buf_, call_obj); _send(); return rpcid; diff --git a/components/net/cpp/include/ftl/net/protocol.hpp b/components/net/cpp/include/ftl/net/protocol.hpp index b9effae82..6ae76c054 100644 --- a/components/net/cpp/include/ftl/net/protocol.hpp +++ b/components/net/cpp/include/ftl/net/protocol.hpp @@ -10,7 +10,7 @@ namespace net { typedef std::tuple<uint64_t, uint32_t, ftl::UUID> Handshake; -static const uint64_t kMagic = 0x1099340053640912; +static const uint64_t kMagic = 0x0009340053640912; static const uint32_t kVersion = (FTL_VERSION_MAJOR << 16) + (FTL_VERSION_MINOR << 8) + FTL_VERSION_PATCH; diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 1b233519a..306747a65 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -327,7 +327,9 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { std::map<Peer*, int> record; std::shared_lock<std::shared_mutex> lk(net_mutex_); for (auto p : peers_) { - if (!p->isConnected()) continue; + if (!p->isConnected()) { + continue; + } sentcount++; record[p] = p->asyncCall<std::vector<R>>(name, handler, args...); } diff --git a/components/net/cpp/include/ftl/net/ws_internal.hpp b/components/net/cpp/include/ftl/net/ws_internal.hpp index ea0471a88..2e54aa01d 100644 --- a/components/net/cpp/include/ftl/net/ws_internal.hpp +++ b/components/net/cpp/include/ftl/net/ws_internal.hpp @@ -5,6 +5,7 @@ #include <cstddef> #include <functional> #include <ftl/uri.hpp> +#include <msgpack.hpp> #include <ftl/net/common.hpp> @@ -40,6 +41,8 @@ struct wsheader_type { */ int ws_dispatch(const char *data, size_t len, std::function<void(const wsheader_type&,const char*,size_t)> d); +int ws_parse(msgpack::unpacker &buf, wsheader_type &ws); + /** * Websocket header constructor. Fills a buffer with the correct websocket * header for a given opcode, mask setting and message length. diff --git a/components/net/cpp/src/dispatcher.cpp b/components/net/cpp/src/dispatcher.cpp index cd5cb51f3..ec5781d6a 100644 --- a/components/net/cpp/src/dispatcher.cpp +++ b/components/net/cpp/src/dispatcher.cpp @@ -128,16 +128,18 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const auto &&name = std::get<1>(the_call); auto &&args = std::get<2>(the_call); - - // LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI(); auto binding = _locateHandler(name); + //LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI(); if (binding) { try { auto result = (*binding)(args); } catch (const int &e) { + LOG(ERROR) << "Exception in bound function"; throw &e; + } catch (const std::exception &e) { + LOG(ERROR) << "Exception for '" << name << "' - " << e.what(); } } else { LOG(ERROR) << "Missing handler for incoming message (" << name << ")"; diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index baca8fda7..795970641 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -154,7 +154,7 @@ static SOCKET tcpConnect(URI &uri) { return csocket; } -Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(false), universe_(u) { +Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(false), universe_(u), ws_read_header_(false) { status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting; _updateURI(); @@ -196,7 +196,7 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals } } -Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), universe_(u), uri_(pUri) { +Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), universe_(u), ws_read_header_(false), uri_(pUri) { URI uri(pUri); status_ = kInvalid; @@ -219,6 +219,9 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), if (!ws_connect(sock_, uri)) { LOG(ERROR) << "Websocket connection failed"; _badClose(false); + } else { + status_ = kConnecting; + LOG(INFO) << "WEB SOCK CONNECTED"; } } else { LOG(ERROR) << "Connection refused to " << uri.getHost() << ":" << uri.getPort(); @@ -374,20 +377,44 @@ void Peer::data() { }, this); } +inline std::ostream& hex_dump(std::ostream& o, std::string const& v) { + std::ios::fmtflags f(o.flags()); + o << std::hex; + for (auto c : v) { + o << "0x" << std::setw(2) << std::setfill('0') << (static_cast<int>(c) & 0xff) << ' '; + } + o.flags(f); + return o; +} + bool Peer::_data() { std::unique_lock<std::recursive_mutex> lk(recv_mtx_); recv_buf_.reserve_buffer(kMaxMessage); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); - if (rc < 0) { + if (rc <= 0) { return false; } recv_buf_.buffer_consumed(rc); - + + if (scheme_ == ftl::URI::SCHEME_WS && !ws_read_header_) { + wsheader_type ws; + if (ws_parse(recv_buf_, ws) < 0) { + return false; + } + ws_read_header_ = true; + } + + /*if (rc > 0) { + hex_dump(std::cout, std::string((char*)recv_buf_.nonparsed_buffer(), recv_buf_.nonparsed_size())); + std::cout << std::endl; + }*/ + msgpack::object_handle msg; while (recv_buf_.next(msg)) { + ws_read_header_ = false; msgpack::object obj = msg.get(); if (status_ != kConnected) { // First message must be a handshake @@ -407,6 +434,14 @@ bool Peer::_data() { } } disp_->dispatch(*this, obj); + + if (recv_buf_.nonparsed_size() > 0 && scheme_ == ftl::URI::SCHEME_WS) { + wsheader_type ws; + if (ws_parse(recv_buf_, ws) < 0) { + return false; + } + ws_read_header_ = true; + } } return false; } @@ -434,6 +469,7 @@ void Peer::cancelCall(int id) { void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res); std::unique_lock<std::recursive_mutex> lk(send_mtx_); + if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); msgpack::pack(send_buf_, res_obj); _send(); } @@ -481,9 +517,11 @@ int Peer::_send() { char buf[20]; // TODO(nick) Should not be a stack buffer. // Calculate total size of message - for (size_t i=0; i < size; i++) { + for (size_t i=1; i < size; i++) { len += sendvec[i].iov_len; } + + //LOG(INFO) << "SEND SIZE = " << len; // Pack correct websocket header into buffer int rc = ws_prepare(wsheader_type::BINARY_FRAME, false, len, buf, 20); diff --git a/components/net/cpp/src/ws_internal.cpp b/components/net/cpp/src/ws_internal.cpp index a58a47d77..c0cf9630b 100644 --- a/components/net/cpp/src/ws_internal.cpp +++ b/components/net/cpp/src/ws_internal.cpp @@ -76,6 +76,61 @@ int ftl::net::ws_dispatch(const char *data, size_t len, std::function<void(const return (int)(ws.header_size+ws.N); } +/* Taken from easywsclient. */ +int ftl::net::ws_parse(msgpack::unpacker &buf, wsheader_type &ws) { + //wsheader_type ws; + unsigned char *data = (unsigned char*)(buf.nonparsed_buffer()); + auto len = buf.nonparsed_size(); + + if (len < 2) return -1; + + ws.fin = (data[0] & 0x80) == 0x80; + ws.opcode = (wsheader_type::opcode_type) (data[0] & 0x0f); + ws.mask = (data[1] & 0x80) == 0x80; + ws.N0 = (data[1] & 0x7f); + ws.header_size = 2 + (ws.N0 == 126? 2 : 0) + (ws.N0 == 127? 8 : 0) + (ws.mask? 4 : 0); + + if (len < ws.header_size) return -1; + + int i = 0; + if (ws.N0 < 126) { + ws.N = ws.N0; + i = 2; + } else if (ws.N0 == 126) { + ws.N = 0; + ws.N |= ((uint64_t) data[2]) << 8; + ws.N |= ((uint64_t) data[3]) << 0; + i = 4; + } else if (ws.N0 == 127) { + ws.N = 0; + ws.N |= ((uint64_t) data[2]) << 56; + ws.N |= ((uint64_t) data[3]) << 48; + ws.N |= ((uint64_t) data[4]) << 40; + ws.N |= ((uint64_t) data[5]) << 32; + ws.N |= ((uint64_t) data[6]) << 24; + ws.N |= ((uint64_t) data[7]) << 16; + ws.N |= ((uint64_t) data[8]) << 8; + ws.N |= ((uint64_t) data[9]) << 0; + i = 10; + } + + if (ws.mask) { + ws.masking_key[0] = ((uint8_t) data[i+0]) << 0; + ws.masking_key[1] = ((uint8_t) data[i+1]) << 0; + ws.masking_key[2] = ((uint8_t) data[i+2]) << 0; + ws.masking_key[3] = ((uint8_t) data[i+3]) << 0; + } else { + ws.masking_key[0] = 0; + ws.masking_key[1] = 0; + ws.masking_key[2] = 0; + ws.masking_key[3] = 0; + } + + //if (len < ws.header_size+ws.N) return -1; + buf.skip_nonparsed_buffer(ws.header_size); + return (int)(ws.header_size+ws.N); +} + int ftl::net::ws_prepare(wsheader_type::opcode_type op, bool useMask, size_t len, char *data, size_t maxlen) { // TODO: // Masking key should (must) be derived from a high quality random @@ -125,7 +180,7 @@ int ftl::net::ws_prepare(wsheader_type::opcode_type op, bool useMask, size_t len header[13] = masking_key[3]; } } - + return (int)header_size; } diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index 2ea5babc8..c79b2f39a 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -109,6 +109,8 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID if (rate < 0 || rate >= 10) return; if (N < 0 || N > ftl::rgbd::kMaxFrames) return; + LOG(INFO) << "Adding Stream Peer: " << peer.to_string(); + StreamClient c; c.peerid = peer; c.uri = dest; diff --git a/config/config.jsonc b/config/config.jsonc index a9ea3321a..cf69dbfc7 100644 --- a/config/config.jsonc +++ b/config/config.jsonc @@ -205,5 +205,11 @@ "peers": ["tcp://localhost:9001", "tcp://localhost:9002"] }, "source": {"uri": "ftl://utu.fi#vision_default/source"} + }, + + "gui_web": { + "net": { + "peers": ["ws://localhost:8080/"] + } } } diff --git a/web-service/package.json b/web-service/package.json index ea4d7e8fc..6dee76250 100644 --- a/web-service/package.json +++ b/web-service/package.json @@ -14,6 +14,7 @@ "license": "ISC", "dependencies": { "express": "^4.16.4", - "express-ws": "^4.0.0" + "express-ws": "^4.0.0", + "msgpack5": "^4.2.1" } } diff --git a/web-service/src/index.js b/web-service/src/index.js index 42d7cfefb..6a67fce8b 100644 --- a/web-service/src/index.js +++ b/web-service/src/index.js @@ -1,25 +1,140 @@ const express = require('express'); const app = express(); const expressWs = require('express-ws')(app); +const Peer = require('./peer.js'); + +// ---- INDEXES ---------------------------------------------------------------- + +let peer_by_id = {}; +//let uri_to_peer = {}; +let peer_uris = {}; + +let uri_data = {}; + +// ---- PROTOCOL --------------------------------------------------------------- app.get('/', (req, res) => { res.end(); }); +app.get('/streams', (req, res) => { + res.json(Object.keys(uri_data)); +}); + +app.get('/stream/rgb', (req, res) => { + let uri = req.query.uri; + if (uri_data.hasOwnProperty(uri)) { + res.writeHead(200, {'Content-Type': 'image/jpeg'}); + res.end(uri_data[uri].rgb); + } + res.end(); +}); + +app.get('/stream/depth', (req, res) => { + let uri = req.query.uri; + if (uri_data.hasOwnProperty(uri)) { + res.writeHead(200, {'Content-Type': 'image/png'}); + res.end(uri_data[uri].depth); + } + res.end(); +}); + +//app.get('/stream', (req, res)) + app.ws('/', (ws, req) => { console.log("New web socket request"); - // SEND Handshake - ws.on('message', (msg) => { - console.log("Message", msg); + + let p = new Peer(ws); + + p.on("connect", (peer) => { + peer.rpc("node_details", (details) => { + let obj = JSON.parse(details[0]); + + peer.uri = obj.id; + peer.name = obj.title; + peer.master = (obj.kind == "master"); + console.log("Peer name = ", peer.name); + + peer_uris[peer.string_id] = []; + peer_by_id[peer.string_id] = peer; + + if (!peer.master) { + peer.rpc("list_streams", (streams) => { + console.log("STREAMS", streams); + for (let i=0; i<streams.length; i++) { + //uri_to_peer[streams[i]] = peer; + peer_uris[peer.string_id].push(streams[i]); + + uri_data[streams[i]] = { + peer: peer, + title: "", + rgb: null, + depth: null, + pose: null + }; + } + }); + } + }); }); - ws.on('error', () => { - console.log("Error"); + + p.on("disconnect", (peer) => { + console.log("DISCONNECT"); + // Remove all peer details and streams.... + + let puris = peer_uris[peer.string_id]; + if (puris) { + for (let i=0; i<puris.length; i++) { + console.log("Removing stream: ", puris[i]); + //delete uri_to_peer[puris[i]]; + delete uri_data[puris[i]]; + } + delete peer_uris[peer.string_id]; + } + if (peer_by_id.hasOwnProperty(peer.string_id)) delete peer_by_id[peer.string_id]; + }); + + p.bind("list_streams", () => { + return Object.keys(uri_data); }); - ws.on('close', () => { - console.log("Close"); + + p.bind("find_stream", (uri) => { + if (uri_data.hasOwnProperty(uri)) { + return [Peer.uuid]; + } else { + return null; // or []?? + } + }); + + p.proxy("source_calibration", (cb, uri) => { + let peer = uri_data[uri].peer; + if (peer) { + peer.rpc("source_calibration", cb, uri); + } + }); + + p.bind("set_pose", (uri, vec) => { + //console.log("SET POSE"); + let peer = uri_data[uri].peer; + if (peer) { + uri_data[uri].pose = vec; + peer.send("set_pose", uri, vec); + } + }); + + p.bind("get_stream", (uri, N, rate, pid, dest) => { + let peer = uri_data[uri].peer; + if (peer) { + peer.bind(uri, (rgb, depth) => { + uri_data[uri].rgb = rgb; + uri_data[uri].depth = depth; + p.send(uri, rgb, depth); + }); + peer.send("get_stream", uri, N, rate, [Peer.uuid], dest); + } }); }); -console.log("Listening or port 3000"); -app.listen(3000); +console.log("Listening or port 8080"); +app.listen(8080); diff --git a/web-service/src/peer.js b/web-service/src/peer.js new file mode 100644 index 000000000..b90426ed1 --- /dev/null +++ b/web-service/src/peer.js @@ -0,0 +1,182 @@ +const msgpack = require('msgpack5')() + , encode = msgpack.encode + , decode = msgpack.decode; + +const kConnecting = 1; +const kConnected = 2; +const kDisconnected = 3; + +let my_uuid = new Uint8Array(16); +my_uuid[0] = 44; +my_uuid = Buffer.from(my_uuid); + +const kMagic = 0x0009340053640912; +const kVersion = 0; + +function Peer(ws) { + this.sock = ws; + this.status = kConnecting; + this.id = null; + this.string_id = ""; + this.bindings = {}; + this.proxies = {}; + this.events = {}; + this.callbacks = {}; + this.cbid = 0; + + this.uri = "unknown"; + this.name = "unknown"; + this.master = false; + + this.sock.on("message", (raw) => { + let msg = decode(raw); + if (this.status == kConnecting) { + if (msg[1] != "__handshake__") { + console.log("Bad handshake"); + this.close(); + } + } + //console.log("MSG", msg); + if (msg[0] == 0) { + // Notification + if (msg.length == 3) { + this._dispatchNotification(msg[1], msg[2]); + // Call + } else { + this._dispatchCall(msg[2], msg[1], msg[3]); + } + } else if (msg[0] == 1) { + this._dispatchResponse(msg[1], msg[3]); + } + }); + + this.sock.on("close", () => { + this.status = kDisconnected; + this._notify("disconnect", this); + }); + + this.sock.on("error", () => { + console.error("Socket error"); + this.sock.close(); + this.status = kDisconnected; + }); + + this.bind("__handshake__", (magic, version, id) => { + if (magic == kMagic) { + console.log("Handshake received"); + this.status = kConnected; + this.id = id.buffer; + this.string_id = id.toString('hex'); + this._notify("connect", this); + } else { + console.log("Magic does not match"); + this.close(); + } + }); + + this.send("__handshake__", kMagic, kVersion, [my_uuid]); +} + +Peer.uuid = my_uuid; + +Peer.prototype._dispatchNotification = function(name, args) { + if (this.bindings.hasOwnProperty(name)) { + this.bindings[name].apply(this, args); + } else { + console.log("Missing handler for: ", name); + } +} + +Peer.prototype._dispatchCall = function(name, id, args) { + if (this.bindings.hasOwnProperty(name)) { + console.log("Call for:", name, id); + let res = this.bindings[name].apply(this, args); + + try { + this.sock.send(encode([1,id,name,res])); + } catch(e) { + this.close(); + } + } else if (this.proxies.hasOwnProperty(name)) { + console.log("Proxy for:", name, id); + args.unshift((res) => { + try { + this.sock.send(encode([1,id,name,res])); + } catch(e) { + this.close(); + } + }); + this.proxies[name].apply(this, args); + } else { + console.log("Missing handler for: ", name); + } +} + +Peer.prototype._dispatchResponse = function(id, res) { + if (this.callbacks.hasOwnProperty(id)) { + this.callbacks[id].call(this, res); + delete this.callbacks[id]; + } else { + console.log("Missing callback"); + } +} + +Peer.prototype.bind = function(name, f) { + if (this.bindings.hasOwnProperty(name)) { + //console.error("Duplicate bind to same procedure"); + this.bindings[name] = f; + } else { + this.bindings[name] = f; + } +} + +Peer.prototype.proxy = function(name, f) { + if (this.proxies.hasOwnProperty(name)) { + //console.error("Duplicate proxy to same procedure"); + this.proxies[name] = f; + } else { + this.proxies[name] = f; + } +} + +Peer.prototype.rpc = function(name, cb, ...args) { + let id = this.cbid++; + this.callbacks[id] = cb; + + try { + this.sock.send(encode([0, id, name, args])); + } catch(e) { + this.close(); + } +} + +Peer.prototype.send = function(name, ...args) { + try { + this.sock.send(encode([0, name, args])); + } catch(e) { + this.close(); + } +} + +Peer.prototype.close = function() { + this.sock.close(); + this.status = kDisconnected; +} + +Peer.prototype._notify = function(evt, ...args) { + if (this.events.hasOwnProperty(evt)) { + for (let i=0; i<this.events[evt].length; i++) { + let f = this.events[evt][i]; + f.apply(this, args); + } + } +} + +Peer.prototype.on = function(evt, f) { + if (!this.events.hasOwnProperty(evt)) { + this.events[evt] = []; + } + this.events[evt].push(f); +} + +module.exports = Peer; -- GitLab