Skip to content
Snippets Groups Projects

Generate statistics every second

Merged Nicolas Pope requested to merge feature/statistics into main
2 files
+ 109
23
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 83
20
@@ -30,6 +30,18 @@ export interface Peer extends Emitter {}
export type RPCMethod = (...args: unknown[]) => unknown;
interface IStatistics {
txTotal: number;
rxTotal: number;
txBytes: number;
rxBytes: number;
txRate: number;
rxRate: number;
txRequested: number;
txRatio: number; // Requested over sent, should be close to 1
duration: number;
};
/**
* Wrap a web socket with a MsgPack RCP protocol that works with our C++ version.
* @param {websocket} ws Websocket object
@@ -45,7 +57,6 @@ export class Peer {
callbacks = {};
cbid = 0;
server = false;
sendCount = 0;
latency = 0;
@@ -54,12 +65,25 @@ export class Peer {
master = false;
txBytes = 0;
txRequested = 0;
rxBytes = 0;
lastStatsCall = Date.now();
lastStats = null;
lastStats: IStatistics = {
txTotal: 0,
rxTotal: 0,
txBytes: 0,
rxBytes: 0,
txRate: 0,
rxRate: 0,
txRequested: 0,
txRatio: 0,
duration: 0,
};
static uuid: string;
static peers: Map<string, Peer> = new Map();
constructor(ws?: WebSocketConnection, server = false) {
this.sock = ws;
this.server = server;
@@ -71,7 +95,7 @@ export class Peer {
raw = raw.data;
}
this.rxBytes += raw.length || raw.byteLength;
this.rxBytes += raw.length || raw.byteLength || 0;
const msg = decode(raw);
// console.log('MSG', msg)
if (this.status === kConnecting) {
@@ -95,12 +119,14 @@ export class Peer {
}
const close = () => {
this.emit("disconnect", this);
this.status = kDisconnected;
Peer.removePeer(this);
this.emit("disconnect", this);
}
const error = (e) => {
console.error("Socket error: ", e);
Peer.removePeer(this);
this.sock.close();
this.status = kDisconnected;
}
@@ -129,28 +155,39 @@ export class Peer {
}
getStatistics() {
return this.lastStats;
}
updateStatistics() {
const time = Date.now();
if (time - this.lastStatsCall > 5000 || !this.lastStats) {
this.lastStats = [time - this.lastStatsCall, this.rxBytes, 0];
this.rxBytes = 0;
const duration = time - this.lastStatsCall;
if (duration < 10) return;
this.lastStats = {
txTotal: this.lastStats.txTotal + this.txBytes,
rxTotal: this.lastStats.rxTotal + this.rxBytes,
txBytes: this.txBytes,
rxBytes: this.rxBytes,
txRate: this.txBytes / duration,
rxRate: this.rxBytes / duration,
txRequested: this.txRequested,
txRatio: (this.txBytes > 0) ? this.txRequested / this.txBytes : 0.0,
duration,
};
this.txBytes = 0;
this.rxBytes = 0;
this.txRequested = 0;
this.lastStatsCall = time;
}
return this.lastStats;
}
sendHandshake() {
this.send("__handshake__", kMagic, kVersion, [my_uuid]);
}
isBuffering(): boolean {
return this.sendCount > 0;
}
private _send(msg: Buffer | ReturnType<typeof encode>) {
++this.sendCount;
const bytes = msg.length;
this.txRequested += bytes;
this.sock.send(msg, () => {
--this.sendCount;
this.txBytes += bytes;
});
}
@@ -159,6 +196,7 @@ export class Peer {
this.status = kConnected;
this.id = id[0];
this.string_id = id[0].toString('hex');
Peer.addPeer(this);
if (!this.server) {
this.sendHandshake();
}
@@ -309,16 +347,41 @@ export class Peer {
/**
* Closes the socket
*/
close() {
if(!isw3c(this.sock)){
this.sock.close();
}
this.status = kDisconnected;
async close() {
if (this.status === kDisconnected) return;
return new Promise(resolve => {
this.on("disconnect", resolve);
if(!isw3c(this.sock)){
this.sock.close();
}
});
}
getUuid(): string {
return uuid;
}
static interval?: NodeJS.Timer;
static addPeer(p: Peer) {
Peer.peers.set(p.string_id, p);
if (Peer.peers.size === 1 && !Peer.interval) {
Peer.interval = setInterval(() => {
for (const [id, peer] of Peer.peers) {
peer.updateStatistics();
}
}, 1000);
}
}
static removePeer(p: Peer) {
Peer.peers.delete(p.string_id);
if (Peer.peers.size === 0 && Peer.interval) {
clearInterval(Peer.interval);
Peer.interval = undefined;
}
}
}
ee(Peer.prototype);
Loading