diff --git a/src/Peer.ts b/src/Peer.ts index 0835ce705f905ece6cc3094a3a38c6421e6769a4..47f3bb3ffa5a7e626b9203ee84347775e590ed10 100644 --- a/src/Peer.ts +++ b/src/Peer.ts @@ -30,6 +30,18 @@ export interface Peer extends Emitter {} export type RPCMethod = (...args: unknown[]) => unknown; +interface IStatistics { + txTotal: number; + rxTotal: number; + txBytes: number; + rxBytes: number; + txRate: number; + rxRate: number; + txRequested: number; + txRatio: number; // Requested over sent, should be close to 1 + duration: number; +}; + /** * Wrap a web socket with a MsgPack RCP protocol that works with our C++ version. * @param {websocket} ws Websocket object @@ -45,7 +57,6 @@ export class Peer { callbacks = {}; cbid = 0; server = false; - sendCount = 0; latency = 0; @@ -54,12 +65,25 @@ export class Peer { master = false; txBytes = 0; + txRequested = 0; rxBytes = 0; lastStatsCall = Date.now(); - lastStats = null; + lastStats: IStatistics = { + txTotal: 0, + rxTotal: 0, + txBytes: 0, + rxBytes: 0, + txRate: 0, + rxRate: 0, + txRequested: 0, + txRatio: 0, + duration: 0, + }; static uuid: string; + static peers: Map<string, Peer> = new Map(); + constructor(ws?: WebSocketConnection, server = false) { this.sock = ws; this.server = server; @@ -71,7 +95,7 @@ export class Peer { raw = raw.data; } - this.rxBytes += raw.length || raw.byteLength; + this.rxBytes += raw.length || raw.byteLength || 0; const msg = decode(raw); // console.log('MSG', msg) if (this.status === kConnecting) { @@ -95,12 +119,14 @@ export class Peer { } const close = () => { - this.emit("disconnect", this); this.status = kDisconnected; + Peer.removePeer(this); + this.emit("disconnect", this); } const error = (e) => { console.error("Socket error: ", e); + Peer.removePeer(this); this.sock.close(); this.status = kDisconnected; } @@ -129,28 +155,39 @@ export class Peer { } getStatistics() { + return this.lastStats; + } + + updateStatistics() { const time = Date.now(); - if (time - this.lastStatsCall > 5000 || !this.lastStats) { - this.lastStats = [time - this.lastStatsCall, this.rxBytes, 0]; - this.rxBytes = 0; + const duration = time - this.lastStatsCall; + if (duration < 10) return; + this.lastStats = { + txTotal: this.lastStats.txTotal + this.txBytes, + rxTotal: this.lastStats.rxTotal + this.rxBytes, + txBytes: this.txBytes, + rxBytes: this.rxBytes, + txRate: this.txBytes / duration, + rxRate: this.rxBytes / duration, + txRequested: this.txRequested, + txRatio: (this.txBytes > 0) ? this.txRequested / this.txBytes : 0.0, + duration, + }; this.txBytes = 0; + this.rxBytes = 0; + this.txRequested = 0; this.lastStatsCall = time; - } - return this.lastStats; } sendHandshake() { this.send("__handshake__", kMagic, kVersion, [my_uuid]); } - isBuffering(): boolean { - return this.sendCount > 0; - } - private _send(msg: Buffer | ReturnType<typeof encode>) { - ++this.sendCount; + const bytes = msg.length; + this.txRequested += bytes; this.sock.send(msg, () => { - --this.sendCount; + this.txBytes += bytes; }); } @@ -159,6 +196,7 @@ export class Peer { this.status = kConnected; this.id = id[0]; this.string_id = id[0].toString('hex'); + Peer.addPeer(this); if (!this.server) { this.sendHandshake(); } @@ -309,16 +347,41 @@ export class Peer { /** * Closes the socket */ - close() { - if(!isw3c(this.sock)){ - this.sock.close(); - } - this.status = kDisconnected; + async close() { + if (this.status === kDisconnected) return; + + return new Promise(resolve => { + this.on("disconnect", resolve); + if(!isw3c(this.sock)){ + this.sock.close(); + } + }); } getUuid(): string { return uuid; } + + static interval?: NodeJS.Timer; + + static addPeer(p: Peer) { + Peer.peers.set(p.string_id, p); + if (Peer.peers.size === 1 && !Peer.interval) { + Peer.interval = setInterval(() => { + for (const [id, peer] of Peer.peers) { + peer.updateStatistics(); + } + }, 1000); + } + } + + static removePeer(p: Peer) { + Peer.peers.delete(p.string_id); + if (Peer.peers.size === 0 && Peer.interval) { + clearInterval(Peer.interval); + Peer.interval = undefined; + } + } } ee(Peer.prototype); diff --git a/tests/peer.unit.test.ts b/tests/peer.unit.test.ts index 414d17b5684489c8d7ececafc8f588ebd25e57dc..a86db1a081c10163154b54422bc6ada36e858665 100644 --- a/tests/peer.unit.test.ts +++ b/tests/peer.unit.test.ts @@ -1,10 +1,15 @@ import {Peer} from '../src/Peer'; import WebSocket from 'ws'; +function wait(ms: number) { + return new Promise<void>((resolve) => setTimeout(resolve, ms)); +} + describe("Peer class", () => { let clientPeer: Peer; let serverPeer: Peer; let wss: WebSocket.Server; + let conn: WebSocket; beforeEach(async () => { return new Promise((resolve) => { @@ -14,7 +19,7 @@ describe("Peer class", () => { serverPeer = new Peer(ws, true); }); - const conn = new WebSocket('ws://127.0.0.1:9003'); + conn = new WebSocket('ws://127.0.0.1:9003'); clientPeer = new Peer(conn); clientPeer.on('connect', () => { resolve(true); @@ -23,9 +28,9 @@ describe("Peer class", () => { }); afterEach(async () => { + await clientPeer.close(); + await serverPeer.close(); return new Promise((resolve) => { - clientPeer.close(); - serverPeer.close(); wss.close(() => { resolve(true); }); @@ -91,4 +96,22 @@ describe("Peer class", () => { expect.assertions(3); }); + + it("can generate statistics", async () => { + expect(clientPeer).toBeTruthy(); + expect(serverPeer).toBeTruthy(); + + await new Promise((resolve) => { + serverPeer.bind("test_rpc", (value) => { + expect(value).toBe(50); + resolve(true); + }); + + clientPeer.send("test_rpc", 50); + }); + + await wait(2000); + + expect(serverPeer.getStatistics().txTotal).toBeGreaterThan(0); + }); }); \ No newline at end of file