Skip to content
Snippets Groups Projects
Commit 418631b8 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/statistics' into 'main'

Generate statistics every second

See merge request !2
parents 9a672e7b ecc84ba9
Branches
Tags 0.1.4
1 merge request!2Generate statistics every second
Pipeline #57412 passed
...@@ -30,6 +30,18 @@ export interface Peer extends Emitter {} ...@@ -30,6 +30,18 @@ export interface Peer extends Emitter {}
export type RPCMethod = (...args: unknown[]) => unknown; export type RPCMethod = (...args: unknown[]) => unknown;
interface IStatistics {
txTotal: number;
rxTotal: number;
txBytes: number;
rxBytes: number;
txRate: number;
rxRate: number;
txRequested: number;
txRatio: number; // Requested over sent, should be close to 1
duration: number;
};
/** /**
* Wrap a web socket with a MsgPack RCP protocol that works with our C++ version. * Wrap a web socket with a MsgPack RCP protocol that works with our C++ version.
* @param {websocket} ws Websocket object * @param {websocket} ws Websocket object
...@@ -45,7 +57,6 @@ export class Peer { ...@@ -45,7 +57,6 @@ export class Peer {
callbacks = {}; callbacks = {};
cbid = 0; cbid = 0;
server = false; server = false;
sendCount = 0;
latency = 0; latency = 0;
...@@ -54,12 +65,25 @@ export class Peer { ...@@ -54,12 +65,25 @@ export class Peer {
master = false; master = false;
txBytes = 0; txBytes = 0;
txRequested = 0;
rxBytes = 0; rxBytes = 0;
lastStatsCall = Date.now(); lastStatsCall = Date.now();
lastStats = null; lastStats: IStatistics = {
txTotal: 0,
rxTotal: 0,
txBytes: 0,
rxBytes: 0,
txRate: 0,
rxRate: 0,
txRequested: 0,
txRatio: 0,
duration: 0,
};
static uuid: string; static uuid: string;
static peers: Map<string, Peer> = new Map();
constructor(ws?: WebSocketConnection, server = false) { constructor(ws?: WebSocketConnection, server = false) {
this.sock = ws; this.sock = ws;
this.server = server; this.server = server;
...@@ -71,7 +95,7 @@ export class Peer { ...@@ -71,7 +95,7 @@ export class Peer {
raw = raw.data; raw = raw.data;
} }
this.rxBytes += raw.length || raw.byteLength; this.rxBytes += raw.length || raw.byteLength || 0;
const msg = decode(raw); const msg = decode(raw);
// console.log('MSG', msg) // console.log('MSG', msg)
if (this.status === kConnecting) { if (this.status === kConnecting) {
...@@ -95,12 +119,14 @@ export class Peer { ...@@ -95,12 +119,14 @@ export class Peer {
} }
const close = () => { const close = () => {
this.emit("disconnect", this);
this.status = kDisconnected; this.status = kDisconnected;
Peer.removePeer(this);
this.emit("disconnect", this);
} }
const error = (e) => { const error = (e) => {
console.error("Socket error: ", e); console.error("Socket error: ", e);
Peer.removePeer(this);
this.sock.close(); this.sock.close();
this.status = kDisconnected; this.status = kDisconnected;
} }
...@@ -129,28 +155,39 @@ export class Peer { ...@@ -129,28 +155,39 @@ export class Peer {
} }
getStatistics() { getStatistics() {
return this.lastStats;
}
updateStatistics() {
const time = Date.now(); const time = Date.now();
if (time - this.lastStatsCall > 5000 || !this.lastStats) { const duration = time - this.lastStatsCall;
this.lastStats = [time - this.lastStatsCall, this.rxBytes, 0]; if (duration < 10) return;
this.rxBytes = 0; this.lastStats = {
txTotal: this.lastStats.txTotal + this.txBytes,
rxTotal: this.lastStats.rxTotal + this.rxBytes,
txBytes: this.txBytes,
rxBytes: this.rxBytes,
txRate: this.txBytes / duration,
rxRate: this.rxBytes / duration,
txRequested: this.txRequested,
txRatio: (this.txBytes > 0) ? this.txRequested / this.txBytes : 0.0,
duration,
};
this.txBytes = 0; this.txBytes = 0;
this.rxBytes = 0;
this.txRequested = 0;
this.lastStatsCall = time; this.lastStatsCall = time;
} }
return this.lastStats;
}
sendHandshake() { sendHandshake() {
this.send("__handshake__", kMagic, kVersion, [my_uuid]); this.send("__handshake__", kMagic, kVersion, [my_uuid]);
} }
isBuffering(): boolean {
return this.sendCount > 0;
}
private _send(msg: Buffer | ReturnType<typeof encode>) { private _send(msg: Buffer | ReturnType<typeof encode>) {
++this.sendCount; const bytes = msg.length;
this.txRequested += bytes;
this.sock.send(msg, () => { this.sock.send(msg, () => {
--this.sendCount; this.txBytes += bytes;
}); });
} }
...@@ -159,6 +196,7 @@ export class Peer { ...@@ -159,6 +196,7 @@ export class Peer {
this.status = kConnected; this.status = kConnected;
this.id = id[0]; this.id = id[0];
this.string_id = id[0].toString('hex'); this.string_id = id[0].toString('hex');
Peer.addPeer(this);
if (!this.server) { if (!this.server) {
this.sendHandshake(); this.sendHandshake();
} }
...@@ -309,16 +347,41 @@ export class Peer { ...@@ -309,16 +347,41 @@ export class Peer {
/** /**
* Closes the socket * Closes the socket
*/ */
close() { async close() {
if (this.status === kDisconnected) return;
return new Promise(resolve => {
this.on("disconnect", resolve);
if(!isw3c(this.sock)){ if(!isw3c(this.sock)){
this.sock.close(); this.sock.close();
} }
this.status = kDisconnected; });
} }
getUuid(): string { getUuid(): string {
return uuid; return uuid;
} }
static interval?: NodeJS.Timer;
static addPeer(p: Peer) {
Peer.peers.set(p.string_id, p);
if (Peer.peers.size === 1 && !Peer.interval) {
Peer.interval = setInterval(() => {
for (const [id, peer] of Peer.peers) {
peer.updateStatistics();
}
}, 1000);
}
}
static removePeer(p: Peer) {
Peer.peers.delete(p.string_id);
if (Peer.peers.size === 0 && Peer.interval) {
clearInterval(Peer.interval);
Peer.interval = undefined;
}
}
} }
ee(Peer.prototype); ee(Peer.prototype);
......
import {Peer} from '../src/Peer'; import {Peer} from '../src/Peer';
import WebSocket from 'ws'; import WebSocket from 'ws';
function wait(ms: number) {
return new Promise<void>((resolve) => setTimeout(resolve, ms));
}
describe("Peer class", () => { describe("Peer class", () => {
let clientPeer: Peer; let clientPeer: Peer;
let serverPeer: Peer; let serverPeer: Peer;
let wss: WebSocket.Server; let wss: WebSocket.Server;
let conn: WebSocket;
beforeEach(async () => { beforeEach(async () => {
return new Promise((resolve) => { return new Promise((resolve) => {
...@@ -14,7 +19,7 @@ describe("Peer class", () => { ...@@ -14,7 +19,7 @@ describe("Peer class", () => {
serverPeer = new Peer(ws, true); serverPeer = new Peer(ws, true);
}); });
const conn = new WebSocket('ws://127.0.0.1:9003'); conn = new WebSocket('ws://127.0.0.1:9003');
clientPeer = new Peer(conn); clientPeer = new Peer(conn);
clientPeer.on('connect', () => { clientPeer.on('connect', () => {
resolve(true); resolve(true);
...@@ -23,9 +28,9 @@ describe("Peer class", () => { ...@@ -23,9 +28,9 @@ describe("Peer class", () => {
}); });
afterEach(async () => { afterEach(async () => {
await clientPeer.close();
await serverPeer.close();
return new Promise((resolve) => { return new Promise((resolve) => {
clientPeer.close();
serverPeer.close();
wss.close(() => { wss.close(() => {
resolve(true); resolve(true);
}); });
...@@ -91,4 +96,22 @@ describe("Peer class", () => { ...@@ -91,4 +96,22 @@ describe("Peer class", () => {
expect.assertions(3); expect.assertions(3);
}); });
it("can generate statistics", async () => {
expect(clientPeer).toBeTruthy();
expect(serverPeer).toBeTruthy();
await new Promise((resolve) => {
serverPeer.bind("test_rpc", (value) => {
expect(value).toBe(50);
resolve(true);
});
clientPeer.send("test_rpc", 50);
});
await wait(2000);
expect(serverPeer.getStatistics().txTotal).toBeGreaterThan(0);
});
}); });
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment