Parallel queues

I have written a library for interacting with an API, however the requests seem to stop occasionally, requiring a reboot.

The API has many endpoints, each of which has their own ratelimit. Many of the endpoints are dynamic endpoints which are unknown before and can change during runtime.

I tried to make a parallel queue system so that unrelated endpoints cannot affect each other (e.g. if an endpoint is ratelimited or returns some kind of error, or even one endpoint having an overwelming number of requests compared to other endpoints). The queue is the first part of the request process, so it is important that it does not get blocked:

     Queue------|
       |        |
Check Ratelimit-|
       |
  Make Request

(requests are added to the queue; when the request reaches the front of the queue the ratelimit for the endpoint is checked; if the ratelimit has not been exceeded the request will be made otherwise the request will be added back to the queue)

When the request is added to the queue, the add method is called, with the key being a hash of the endpoint and the value containing relevant data to the request including the resolve() and reject() functions for the promise created when initiating the request.

When the request has been completed, the completed method is called and the hash of the endpoint is passed.

If it is discovered that making a request will exceed a ratelimit, the retryLater method will be called with pause being the time until the request can be safely made.

The request system as a whole must be able to handle a minimum of 5 requests every second.

Unfortunately, the queue seems to get blocked sometimes (some queues will work, some won't). I assume this is probably as a result of some kind of race condition (perhaps from a burst of requests of some kind), but I cannot figure out exactly how.

const EventsEmitter = require("events");

class QueueHandler extends EventsEmitter {
    constructor() {
        super();
        this.queues = {};
    }

    add(key = "null", value) {
        if (!this.queues[key])
            this.queues[key] = { q: new Array(10).fill(null), c: 0, d: 0 };
        this.queues[key].q[this.queues[key].c] = value;
        this.queues[key].c++;
        if (this.queues[key].c > 9)
            this.queues[key].c = 0;
        if (this.queues[key].q.filter(k => k != null).length == 1)
            this.emit("next", key, value);
    }

    completed(key = "null") {
        this.queues[key].q[this.queues[key].d] = null;
        if (this.queues[key].q.filter(k => k != null).length == 0)
            delete this.queues[key];
        else {
            this.queues[key].d++;
            if (this.queues[key].d > 9)
                this.queues[key].d = 0;
            this.emit("next", key, this.queues[key].q[this.queues[key].d]);
        }
    }

    retryLater(key = "null", next = true, pause = 0) {
        this.queues[key].q[this.queues[key].c] = this.queues[key].q[this.queues[key].d];
        this.queues[key].c++;
        if (this.queues[key].c > 9)
            this.queues[key].c = 0;
        this.queues[key].q[this.queues[key].d] = null;
        if (next == true)
            this.next(key);
        else if (pause != 0)
            setTimeout((() => this.next(key)), pause * 1000);
    }
}

module.exports = QueueHandler;

I've tried to find packages to handle the queue for me instead, but I cannot seem to find any parallel queue systems similar to what I am trying to achieve. Perhaps I am tackling this problem incorrectly from the start?



Comments

Popular posts from this blog

Today Walkin 14th-Sept

Spring Elasticsearch Operations

Hibernate Search - Elasticsearch with JSON manipulation