Bull: Unique queue events

Created on 20 Dec 2018  Â·  6Comments  Â·  Source: OptimalBits/bull

Suppose I have two servers, both handling the same queue. One of them adds jobs, the other processes them. If I listen to events on the queue, will they fire on both servers?

// server1
const myQueue = new Queue('mine');

myQueue.add(/* ... */)
myQueue.add(/* ... */)

myQueue.on('completed', () => { /* ... */ })
myQueue.on('failed', () => { /* ... */ })
myQueue.on('active', () => { /* ... */ })
myQueue.on('waiting', () => { /* ... */ })
// server2
const myQueue = new Queue('mine');

myQueue.process(job => { /* ... */ })

myQueue.on('completed', () => { /* ... */ })
myQueue.on('failed', () => { /* ... */ })
myQueue.on('active', () => { /* ... */ })
myQueue.on('waiting', () => { /* ... */ })

The reason I'm asking is because we'd like to track these metrics in our metric tracking system (DataDog), but I'm afraid that the events will fire on both servers, causing our metrics to be duplicated. In our case, we have 7 servers, so would we get every metric for every job sent 7 times to DataDog, once per server?

If so, how could we get reliable metrics?

question

Most helpful comment

Right now I have this code, based on your answers this should give us accurate measurements of the health of our job queue, right?

  queue.on('stalled', job => {
    statsd.increment('jobs.active', -1, {
      queue: name,
    });
  });

  queue.on('waiting', () => {
    statsd.increment('jobs.waiting', 1, {
      queue: name,
    });
  });

  queue.on('active', () => {
    statsd.increment('jobs.waiting', -1, {
      queue: name,
    });
    statsd.increment('jobs.active', 1, {
      queue: name,
    });
  });

  queue.on('completed', () => {
    statsd.increment('jobs.active', -1, {
      queue: name,
    });
    statsd.increment('jobs.completed', 1, {
      queue: name,
    });
  });

  queue.on('error', err => {
    statsd.increment('jobs.active', -1, {
      queue: name,
    });
    statsd.increment('jobs.errored', 1, {
      queue: name,
    });
  });

  queue.on('failed', (job, err) => {
    statsd.increment('jobs.active', -1, {
      queue: name,
    });
    statsd.increment('jobs.failed', 1, {
      queue: name,
    });
  });

Thanks for the help! :pray:

All 6 comments

You will only receive the events on the same worker that is producing the event unless you prefix the event name with global: in which case you will receive the events produced by all the workers.

I understand that, but given that the job is added by one worker and processed by the other, will both receive the active and completed events? Which one will receive which events if not?

The worker will produce the event.

On 21 Dec 2018, at 10:41, Max Stoiber notifications@github.com wrote:

I understand that, but given that the job is added by one worker and processed by the other, will both receive the active and completed events? Which one will receive which events if not?

—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub, or mute the thread.

Okay so it will only fire once. Follow-up question, is the same true for queue.getJobCounts() or will that report _all_ jobs?

Right now I have this code, based on your answers this should give us accurate measurements of the health of our job queue, right?

  queue.on('stalled', job => {
    statsd.increment('jobs.active', -1, {
      queue: name,
    });
  });

  queue.on('waiting', () => {
    statsd.increment('jobs.waiting', 1, {
      queue: name,
    });
  });

  queue.on('active', () => {
    statsd.increment('jobs.waiting', -1, {
      queue: name,
    });
    statsd.increment('jobs.active', 1, {
      queue: name,
    });
  });

  queue.on('completed', () => {
    statsd.increment('jobs.active', -1, {
      queue: name,
    });
    statsd.increment('jobs.completed', 1, {
      queue: name,
    });
  });

  queue.on('error', err => {
    statsd.increment('jobs.active', -1, {
      queue: name,
    });
    statsd.increment('jobs.errored', 1, {
      queue: name,
    });
  });

  queue.on('failed', (job, err) => {
    statsd.increment('jobs.active', -1, {
      queue: name,
    });
    statsd.increment('jobs.failed', 1, {
      queue: name,
    });
  });

Thanks for the help! :pray:

@mxstbr I think that is the best that can be achieved today. In Bull 4 we will use streams for events, and then you will get very reliable events, never lost even in case of intermittent connections.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

ianstormtaylor picture ianstormtaylor  Â·  4Comments

weeco picture weeco  Â·  3Comments

davedbase picture davedbase  Â·  3Comments

NicolasDuran picture NicolasDuran  Â·  4Comments

pigaov10 picture pigaov10  Â·  3Comments