Skip to content
Snippets Groups Projects
Commit 40aa21ee authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Add backpressure check

parent d227a50d
Branches
Tags
1 merge request!1Add backpressure check
Pipeline #57332 passed
......@@ -45,6 +45,7 @@ export class Peer {
callbacks = {};
cbid = 0;
server = false;
sendCount = 0;
latency = 0;
......@@ -142,6 +143,17 @@ export class Peer {
this.send("__handshake__", kMagic, kVersion, [my_uuid]);
}
isBuffering(): boolean {
return this.sendCount > 0;
}
private _send(msg: Buffer | ReturnType<typeof encode>) {
++this.sendCount;
this.sock.send(msg, () => {
--this.sendCount;
});
}
private _handshake(magic: number, version: number, id: Buffer[]) {
if (magic == kMagic) {
this.status = kConnected;
......@@ -175,25 +187,25 @@ export class Peer {
const res = this.bindings[name].apply(this, args);
if (res instanceof Promise) {
res.then(r => {
this.sock.send(encode([1,id,null,r]));
this._send(encode([1,id,null,r]));
});
} else {
this.sock.send(encode([1,id,null,res]));
this._send(encode([1,id,null,res]));
}
} catch(e) {
// console.error("Could to dispatch or return call", e);
// this.close();
this.sock.send(encode([1,id,e.toString(),null]));
this._send(encode([1,id,e.toString(),null]));
}
} else if (name in this.proxies) {
//console.log("Proxy for:", name, id);
args.unshift((res: unknown) => {
try {
this.sock.send(encode([1,id,null,res]));
this._send(encode([1,id,null,res]));
} catch(e) {
// console.log("ERROR")
// this.close();
this.sock.send(encode([1,id,e.toString(),null]));
this._send(encode([1,id,e.toString(),null]));
}
});
this.proxies[name].apply(this, args);
......@@ -264,7 +276,7 @@ export class Peer {
this.callbacks[id] = (r) => resolve(r);
try {
this.sock.send(encode([0, id, name, args]));
this._send(encode([0, id, name, args]));
} catch(e) {
this.close();
reject();
......@@ -280,7 +292,7 @@ export class Peer {
*/
send(name: string, ...args: unknown[]) {
try {
this.sock.send(encode([0, name, args]));
this._send(encode([0, name, args]));
} catch(e) {
this.close();
}
......@@ -288,7 +300,7 @@ export class Peer {
sendB(name: string, args: unknown[]) {
try {
this.sock.send(encode([0, name, args]));
this._send(encode([0, name, args]));
} catch(e) {
this.close();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment