You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
230 lines
7.3 KiB
230 lines
7.3 KiB
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
exports.Throttler = exports.timeToWait = exports.backoff = void 0;
|
|
const logger_1 = require("../logger");
|
|
const retries_exhausted_error_1 = require("./errors/retries-exhausted-error");
|
|
const timeout_error_1 = require("./errors/timeout-error");
|
|
function backoff(retryNumber, delay, maxDelay) {
|
|
return new Promise((resolve) => {
|
|
setTimeout(resolve, timeToWait(retryNumber, delay, maxDelay));
|
|
});
|
|
}
|
|
exports.backoff = backoff;
|
|
function timeToWait(retryNumber, delay, maxDelay) {
|
|
return Math.min(delay * Math.pow(2, retryNumber), maxDelay);
|
|
}
|
|
exports.timeToWait = timeToWait;
|
|
function DEFAULT_HANDLER(task) {
|
|
return task();
|
|
}
|
|
class Throttler {
|
|
constructor(options) {
|
|
this.name = "";
|
|
this.concurrency = 200;
|
|
this.handler = DEFAULT_HANDLER;
|
|
this.active = 0;
|
|
this.complete = 0;
|
|
this.success = 0;
|
|
this.errored = 0;
|
|
this.retried = 0;
|
|
this.total = 0;
|
|
this.taskDataMap = new Map();
|
|
this.waits = [];
|
|
this.min = 9999999999;
|
|
this.max = 0;
|
|
this.avg = 0;
|
|
this.retries = 0;
|
|
this.backoff = 200;
|
|
this.maxBackoff = 60000;
|
|
this.closed = false;
|
|
this.finished = false;
|
|
this.startTime = 0;
|
|
if (options.name) {
|
|
this.name = options.name;
|
|
}
|
|
if (options.handler) {
|
|
this.handler = options.handler;
|
|
}
|
|
if (typeof options.concurrency === "number") {
|
|
this.concurrency = options.concurrency;
|
|
}
|
|
if (typeof options.retries === "number") {
|
|
this.retries = options.retries;
|
|
}
|
|
if (typeof options.backoff === "number") {
|
|
this.backoff = options.backoff;
|
|
}
|
|
if (typeof options.maxBackoff === "number") {
|
|
this.maxBackoff = options.maxBackoff;
|
|
}
|
|
}
|
|
wait() {
|
|
const p = new Promise((resolve, reject) => {
|
|
this.waits.push({ resolve, reject });
|
|
});
|
|
return p;
|
|
}
|
|
add(task, timeoutMillis) {
|
|
this.addHelper(task, timeoutMillis);
|
|
}
|
|
run(task, timeoutMillis) {
|
|
return new Promise((resolve, reject) => {
|
|
this.addHelper(task, timeoutMillis, { resolve, reject });
|
|
});
|
|
}
|
|
close() {
|
|
this.closed = true;
|
|
return this.finishIfIdle();
|
|
}
|
|
process() {
|
|
if (this.finishIfIdle() || this.active >= this.concurrency || !this.hasWaitingTask()) {
|
|
return;
|
|
}
|
|
this.active++;
|
|
this.handle(this.nextWaitingTaskIndex());
|
|
}
|
|
async handle(cursorIndex) {
|
|
const taskData = this.taskDataMap.get(cursorIndex);
|
|
if (!taskData) {
|
|
throw new Error(`taskData.get(${cursorIndex}) does not exist`);
|
|
}
|
|
const promises = [this.executeTask(cursorIndex)];
|
|
if (taskData.timeoutMillis) {
|
|
promises.push(this.initializeTimeout(cursorIndex));
|
|
}
|
|
let result;
|
|
try {
|
|
result = await Promise.race(promises);
|
|
}
|
|
catch (err) {
|
|
this.errored++;
|
|
this.complete++;
|
|
this.active--;
|
|
this.onTaskFailed(err, cursorIndex);
|
|
return;
|
|
}
|
|
this.success++;
|
|
this.complete++;
|
|
this.active--;
|
|
this.onTaskFulfilled(result, cursorIndex);
|
|
}
|
|
stats() {
|
|
return {
|
|
max: this.max,
|
|
min: this.min,
|
|
avg: this.avg,
|
|
active: this.active,
|
|
complete: this.complete,
|
|
success: this.success,
|
|
errored: this.errored,
|
|
retried: this.retried,
|
|
total: this.total,
|
|
elapsed: Date.now() - this.startTime,
|
|
};
|
|
}
|
|
taskName(cursorIndex) {
|
|
const taskData = this.taskDataMap.get(cursorIndex);
|
|
if (!taskData) {
|
|
return "finished task";
|
|
}
|
|
return typeof taskData.task === "string" ? taskData.task : `index ${cursorIndex}`;
|
|
}
|
|
addHelper(task, timeoutMillis, wait) {
|
|
if (this.closed) {
|
|
throw new Error("Cannot add a task to a closed throttler.");
|
|
}
|
|
if (!this.startTime) {
|
|
this.startTime = Date.now();
|
|
}
|
|
this.taskDataMap.set(this.total, {
|
|
task,
|
|
wait,
|
|
timeoutMillis,
|
|
retryCount: 0,
|
|
isTimedOut: false,
|
|
});
|
|
this.total++;
|
|
this.process();
|
|
}
|
|
finishIfIdle() {
|
|
if (this.closed && !this.hasWaitingTask() && this.active === 0) {
|
|
this.finish();
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
finish(err) {
|
|
this.waits.forEach((p) => {
|
|
if (err) {
|
|
return p.reject(err);
|
|
}
|
|
this.finished = true;
|
|
return p.resolve();
|
|
});
|
|
}
|
|
initializeTimeout(cursorIndex) {
|
|
const taskData = this.taskDataMap.get(cursorIndex);
|
|
const timeoutMillis = taskData.timeoutMillis;
|
|
const timeoutPromise = new Promise((_, reject) => {
|
|
taskData.timeoutId = setTimeout(() => {
|
|
taskData.isTimedOut = true;
|
|
reject(new timeout_error_1.default(this.taskName(cursorIndex), timeoutMillis));
|
|
}, timeoutMillis);
|
|
});
|
|
return timeoutPromise;
|
|
}
|
|
async executeTask(cursorIndex) {
|
|
const taskData = this.taskDataMap.get(cursorIndex);
|
|
const t0 = Date.now();
|
|
let result;
|
|
try {
|
|
result = await this.handler(taskData.task);
|
|
}
|
|
catch (err) {
|
|
if (taskData.retryCount === this.retries) {
|
|
throw new retries_exhausted_error_1.default(this.taskName(cursorIndex), this.retries, err);
|
|
}
|
|
await backoff(taskData.retryCount + 1, this.backoff, this.maxBackoff);
|
|
if (taskData.isTimedOut) {
|
|
throw new timeout_error_1.default(this.taskName(cursorIndex), taskData.timeoutMillis);
|
|
}
|
|
this.retried++;
|
|
taskData.retryCount++;
|
|
logger_1.logger.debug(`[${this.name}] Retrying task`, this.taskName(cursorIndex));
|
|
return this.executeTask(cursorIndex);
|
|
}
|
|
if (taskData.isTimedOut) {
|
|
throw new timeout_error_1.default(this.taskName(cursorIndex), taskData.timeoutMillis);
|
|
}
|
|
const dt = Date.now() - t0;
|
|
this.min = Math.min(dt, this.min);
|
|
this.max = Math.max(dt, this.max);
|
|
this.avg = (this.avg * this.complete + dt) / (this.complete + 1);
|
|
return result;
|
|
}
|
|
onTaskFulfilled(result, cursorIndex) {
|
|
const taskData = this.taskDataMap.get(cursorIndex);
|
|
if (taskData.wait) {
|
|
taskData.wait.resolve(result);
|
|
}
|
|
this.cleanupTask(cursorIndex);
|
|
this.process();
|
|
}
|
|
onTaskFailed(error, cursorIndex) {
|
|
const taskData = this.taskDataMap.get(cursorIndex);
|
|
logger_1.logger.debug(error);
|
|
if (taskData.wait) {
|
|
taskData.wait.reject(error);
|
|
}
|
|
this.cleanupTask(cursorIndex);
|
|
this.finish(error);
|
|
}
|
|
cleanupTask(cursorIndex) {
|
|
const { timeoutId } = this.taskDataMap.get(cursorIndex);
|
|
if (timeoutId) {
|
|
clearTimeout(timeoutId);
|
|
}
|
|
this.taskDataMap.delete(cursorIndex);
|
|
}
|
|
}
|
|
exports.Throttler = Throttler;
|