Skip to content
Snippets Groups Projects
Commit ecc84ba9 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Generate statistics every second

parent 9a672e7b
No related branches found
No related tags found
1 merge request!2Generate statistics every second
...@@ -30,6 +30,18 @@ export interface Peer extends Emitter {} ...@@ -30,6 +30,18 @@ export interface Peer extends Emitter {}
export type RPCMethod = (...args: unknown[]) => unknown; export type RPCMethod = (...args: unknown[]) => unknown;
interface IStatistics {
txTotal: number;
rxTotal: number;
txBytes: number;
rxBytes: number;
txRate: number;
rxRate: number;
txRequested: number;
txRatio: number; // Requested over sent, should be close to 1
duration: number;
};
/** /**
* Wrap a web socket with a MsgPack RCP protocol that works with our C++ version. * Wrap a web socket with a MsgPack RCP protocol that works with our C++ version.
* @param {websocket} ws Websocket object * @param {websocket} ws Websocket object
...@@ -45,7 +57,6 @@ export class Peer { ...@@ -45,7 +57,6 @@ export class Peer {
callbacks = {}; callbacks = {};
cbid = 0; cbid = 0;
server = false; server = false;
sendCount = 0;
latency = 0; latency = 0;
...@@ -54,12 +65,25 @@ export class Peer { ...@@ -54,12 +65,25 @@ export class Peer {
master = false; master = false;
txBytes = 0; txBytes = 0;
txRequested = 0;
rxBytes = 0; rxBytes = 0;
lastStatsCall = Date.now(); lastStatsCall = Date.now();
lastStats = null; lastStats: IStatistics = {
txTotal: 0,
rxTotal: 0,
txBytes: 0,
rxBytes: 0,
txRate: 0,
rxRate: 0,
txRequested: 0,
txRatio: 0,
duration: 0,
};
static uuid: string; static uuid: string;
static peers: Map<string, Peer> = new Map();
constructor(ws?: WebSocketConnection, server = false) { constructor(ws?: WebSocketConnection, server = false) {
this.sock = ws; this.sock = ws;
this.server = server; this.server = server;
...@@ -71,7 +95,7 @@ export class Peer { ...@@ -71,7 +95,7 @@ export class Peer {
raw = raw.data; raw = raw.data;
} }
this.rxBytes += raw.length || raw.byteLength; this.rxBytes += raw.length || raw.byteLength || 0;
const msg = decode(raw); const msg = decode(raw);
// console.log('MSG', msg) // console.log('MSG', msg)
if (this.status === kConnecting) { if (this.status === kConnecting) {
...@@ -95,12 +119,14 @@ export class Peer { ...@@ -95,12 +119,14 @@ export class Peer {
} }
const close = () => { const close = () => {
this.emit("disconnect", this);
this.status = kDisconnected; this.status = kDisconnected;
Peer.removePeer(this);
this.emit("disconnect", this);
} }
const error = (e) => { const error = (e) => {
console.error("Socket error: ", e); console.error("Socket error: ", e);
Peer.removePeer(this);
this.sock.close(); this.sock.close();
this.status = kDisconnected; this.status = kDisconnected;
} }
...@@ -129,28 +155,39 @@ export class Peer { ...@@ -129,28 +155,39 @@ export class Peer {
} }
getStatistics() { getStatistics() {
return this.lastStats;
}
updateStatistics() {
const time = Date.now(); const time = Date.now();
if (time - this.lastStatsCall > 5000 || !this.lastStats) { const duration = time - this.lastStatsCall;
this.lastStats = [time - this.lastStatsCall, this.rxBytes, 0]; if (duration < 10) return;
this.rxBytes = 0; this.lastStats = {
txTotal: this.lastStats.txTotal + this.txBytes,
rxTotal: this.lastStats.rxTotal + this.rxBytes,
txBytes: this.txBytes,
rxBytes: this.rxBytes,
txRate: this.txBytes / duration,
rxRate: this.rxBytes / duration,
txRequested: this.txRequested,
txRatio: (this.txBytes > 0) ? this.txRequested / this.txBytes : 0.0,
duration,
};
this.txBytes = 0; this.txBytes = 0;
this.rxBytes = 0;
this.txRequested = 0;
this.lastStatsCall = time; this.lastStatsCall = time;
} }
return this.lastStats;
}
sendHandshake() { sendHandshake() {
this.send("__handshake__", kMagic, kVersion, [my_uuid]); this.send("__handshake__", kMagic, kVersion, [my_uuid]);
} }
isBuffering(): boolean {
return this.sendCount > 0;
}
private _send(msg: Buffer | ReturnType<typeof encode>) { private _send(msg: Buffer | ReturnType<typeof encode>) {
++this.sendCount; const bytes = msg.length;
this.txRequested += bytes;
this.sock.send(msg, () => { this.sock.send(msg, () => {
--this.sendCount; this.txBytes += bytes;
}); });
} }
...@@ -159,6 +196,7 @@ export class Peer { ...@@ -159,6 +196,7 @@ export class Peer {
this.status = kConnected; this.status = kConnected;
this.id = id[0]; this.id = id[0];
this.string_id = id[0].toString('hex'); this.string_id = id[0].toString('hex');
Peer.addPeer(this);
if (!this.server) { if (!this.server) {
this.sendHandshake(); this.sendHandshake();
} }
...@@ -309,16 +347,41 @@ export class Peer { ...@@ -309,16 +347,41 @@ export class Peer {
/** /**
* Closes the socket * Closes the socket
*/ */
close() { async close() {
if (this.status === kDisconnected) return;
return new Promise(resolve => {
this.on("disconnect", resolve);
if(!isw3c(this.sock)){ if(!isw3c(this.sock)){
this.sock.close(); this.sock.close();
} }
this.status = kDisconnected; });
} }
getUuid(): string { getUuid(): string {
return uuid; return uuid;
} }
static interval?: NodeJS.Timer;
static addPeer(p: Peer) {
Peer.peers.set(p.string_id, p);
if (Peer.peers.size === 1 && !Peer.interval) {
Peer.interval = setInterval(() => {
for (const [id, peer] of Peer.peers) {
peer.updateStatistics();
}
}, 1000);
}
}
static removePeer(p: Peer) {
Peer.peers.delete(p.string_id);
if (Peer.peers.size === 0 && Peer.interval) {
clearInterval(Peer.interval);
Peer.interval = undefined;
}
}
} }
ee(Peer.prototype); ee(Peer.prototype);
......
import {Peer} from '../src/Peer'; import {Peer} from '../src/Peer';
import WebSocket from 'ws'; import WebSocket from 'ws';
function wait(ms: number) {
return new Promise<void>((resolve) => setTimeout(resolve, ms));
}
describe("Peer class", () => { describe("Peer class", () => {
let clientPeer: Peer; let clientPeer: Peer;
let serverPeer: Peer; let serverPeer: Peer;
let wss: WebSocket.Server; let wss: WebSocket.Server;
let conn: WebSocket;
beforeEach(async () => { beforeEach(async () => {
return new Promise((resolve) => { return new Promise((resolve) => {
...@@ -14,7 +19,7 @@ describe("Peer class", () => { ...@@ -14,7 +19,7 @@ describe("Peer class", () => {
serverPeer = new Peer(ws, true); serverPeer = new Peer(ws, true);
}); });
const conn = new WebSocket('ws://127.0.0.1:9003'); conn = new WebSocket('ws://127.0.0.1:9003');
clientPeer = new Peer(conn); clientPeer = new Peer(conn);
clientPeer.on('connect', () => { clientPeer.on('connect', () => {
resolve(true); resolve(true);
...@@ -23,9 +28,9 @@ describe("Peer class", () => { ...@@ -23,9 +28,9 @@ describe("Peer class", () => {
}); });
afterEach(async () => { afterEach(async () => {
await clientPeer.close();
await serverPeer.close();
return new Promise((resolve) => { return new Promise((resolve) => {
clientPeer.close();
serverPeer.close();
wss.close(() => { wss.close(() => {
resolve(true); resolve(true);
}); });
...@@ -91,4 +96,22 @@ describe("Peer class", () => { ...@@ -91,4 +96,22 @@ describe("Peer class", () => {
expect.assertions(3); expect.assertions(3);
}); });
it("can generate statistics", async () => {
expect(clientPeer).toBeTruthy();
expect(serverPeer).toBeTruthy();
await new Promise((resolve) => {
serverPeer.bind("test_rpc", (value) => {
expect(value).toBe(50);
resolve(true);
});
clientPeer.send("test_rpc", 50);
});
await wait(2000);
expect(serverPeer.getStatistics().txTotal).toBeGreaterThan(0);
});
}); });
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment