Bull: Best practice for closing the zombie jobs?

Created on 27 Nov 2018  路  1Comment  路  Source: OptimalBits/bull

Imagine that we have an application, with initialized queue and a few jobs were started. After it, the process was closed with e.g. SIGINT and this jobs were not closed correctly. What snippets do you use for handling that kind of situations and closing all the current jobs after some signal received or about it?

Thanks in advance!

question

Most helpful comment

In a script that starts workers for an array of queues that are processing jobs, queues, I've got something which uses Bull's Queue#pause to stop the workers from starting new jobs while letting them finish their current jobs (unless forced). Find the current implementation of pause here.

Hope this helps!

const queues = [emailQueue, webhookQueue, etc];

process.on("SIGTERM", async () => {
  setTimeout(() => {
    console.warn("Couldn't pause all queues within 30s, sorry! Exiting.");
    process.exit(1);
  }, 30000);

  await pauseAllQueues();
  process.exit(0);
});

let sawSIGINT;

process.on("SIGINT", async () => {
  if (sawSIGINT) {
    console.warn(`Killing the jobs. You monster.`);
    process.exit(1);
  }
  sawSIGINT = true;

  if (!["staging", "production"].includes(process.env.ENVIRONMENT)) {
    console.log(`
      Pausing the queue workers so they finish any current running jobs.
      Press ^C again to kill the workers immediately.
    `);
  }

  await pauseAllQueues();
  process.exit(0);
});

const pauseAllQueues = async () => {
  return Promise.all(
    queues.map(queue => {
      // Pausing will stop the queue worker from picking up any new jobs
      // but continue working any in-flight jobs. The `true` arg pauses
      // only this instance of the queue, not globally (another queue worker
      // might be running elsewhere and it hasn't been asked to terminate, so
      // we don't need to interrupt it.)
      return queue.pause(true);
    })
  );
};

>All comments

In a script that starts workers for an array of queues that are processing jobs, queues, I've got something which uses Bull's Queue#pause to stop the workers from starting new jobs while letting them finish their current jobs (unless forced). Find the current implementation of pause here.

Hope this helps!

const queues = [emailQueue, webhookQueue, etc];

process.on("SIGTERM", async () => {
  setTimeout(() => {
    console.warn("Couldn't pause all queues within 30s, sorry! Exiting.");
    process.exit(1);
  }, 30000);

  await pauseAllQueues();
  process.exit(0);
});

let sawSIGINT;

process.on("SIGINT", async () => {
  if (sawSIGINT) {
    console.warn(`Killing the jobs. You monster.`);
    process.exit(1);
  }
  sawSIGINT = true;

  if (!["staging", "production"].includes(process.env.ENVIRONMENT)) {
    console.log(`
      Pausing the queue workers so they finish any current running jobs.
      Press ^C again to kill the workers immediately.
    `);
  }

  await pauseAllQueues();
  process.exit(0);
});

const pauseAllQueues = async () => {
  return Promise.all(
    queues.map(queue => {
      // Pausing will stop the queue worker from picking up any new jobs
      // but continue working any in-flight jobs. The `true` arg pauses
      // only this instance of the queue, not globally (another queue worker
      // might be running elsewhere and it hasn't been asked to terminate, so
      // we don't need to interrupt it.)
      return queue.pause(true);
    })
  );
};

Was this page helpful?
0 / 5 - 0 ratings

Related issues

ianstormtaylor picture ianstormtaylor  路  4Comments

davedbase picture davedbase  路  3Comments

joe-at-startupmedia picture joe-at-startupmedia  路  3Comments

weeco picture weeco  路  3Comments

btd picture btd  路  3Comments