Bull: Gracefully shutdown sandboxed processor

Created on 26 Oct 2019  路  19Comments  路  Source: OptimalBits/bull

Description

I have currently deployed my workers on Heroku. Heroku restarts my server every 24 hours with SIGTERM. This causes my sandboxed processors to shutdown immediately, even when in the middle of handling a job.

My question is, how can I make sure that my sandboxed processor shutdown gracefully when the process is stopped with SIGTERM?

Currently I am using queue.close() in the main worker file, but nothing shutdown related in the processor file.

Bull version

^3.10.0

Most helpful comment

Example of graceful shutdown that works well for my workers and sandboxed processors

private setupGracefulShutdown() {
    process.on('SIGTERM', async () => {
      console.log('Got SIGTERM');
      setTimeout(() => {
        console.warn(`Couldn't pause all queues within 30s, sorry! Exiting.`);
        process.exit(1);
      }, 30000);

      await this.pauseAllQueues();
      process.exit(0);
    });
  }

  private async pauseAllQueues() {
    console.log('Pausing all queues');

    return Promise.all(
      Array.from(this.queues.values()).map(queue => {
        /**
         * Pausing will stop the queue worker from picking up any new jobs
         * but continue working any in-flight jobs. The `true` arg pauses
         * only this instance of the queue, not globally (another queue worker
         * might be running elsewhere and it hasn't been asked to terminate, so
         * we don't need to interrupt it.)
         */
        return queue.pause(true);
      })
    ).then(() => console.log('Paused all queues, exiting gracefully'));
  }

All 19 comments

queue.close waits for current processors to complete, so that should be enough, just make sure you are "awaiting" the call to close.

Thanks for the quick reply!

This is what my code currently looks like:

worker.txt

This should work right?

I think so. If you see the message in the logs "Queue closed" then it means that close has completed successfully.

These are my logs of the incident:

logs.txt

The line below is logged on the 'global:failed' event.

Oct 25 18:13:00 app/worker.1: [+] Job repeat:d4323af8388e3278537b0f97806bc370:1572027180000 failed. Reason Unexpected exit code: null signal: SIGTERM.

ok, so maybe sigterm is also killing the child processes, I will need to investigate how to solve that, maybe some setting when spawning or something like that.

Example of graceful shutdown that works well for my workers and sandboxed processors

private setupGracefulShutdown() {
    process.on('SIGTERM', async () => {
      console.log('Got SIGTERM');
      setTimeout(() => {
        console.warn(`Couldn't pause all queues within 30s, sorry! Exiting.`);
        process.exit(1);
      }, 30000);

      await this.pauseAllQueues();
      process.exit(0);
    });
  }

  private async pauseAllQueues() {
    console.log('Pausing all queues');

    return Promise.all(
      Array.from(this.queues.values()).map(queue => {
        /**
         * Pausing will stop the queue worker from picking up any new jobs
         * but continue working any in-flight jobs. The `true` arg pauses
         * only this instance of the queue, not globally (another queue worker
         * might be running elsewhere and it hasn't been asked to terminate, so
         * we don't need to interrupt it.)
         */
        return queue.pause(true);
      })
    ).then(() => console.log('Paused all queues, exiting gracefully'));
  }

I'm also experiencing this problem. I want to wait for the current jobs to finish before shutting down my server.

It resolves queue.close() with no problem, but my current job is not finished. The job is however picked up a minute after my server starts again, so it's not lost in limbo. I'd just prefer it to be finished before a shutdown if possible.

I have also tried to pause the queue (in multiple ways) before running .close() but it makes no difference. @rosslavery's solution did not work for me.

I was experiencing the same issue

I was creating my queue doing:

const q = new Queue('my-queue')

I then changed it to:

const q = new Queue('my-queue', {
  redis: {
      port: 6379
      host: 'localhost',
  }
})

Now close waits for the jobs to finish processing


The issue is that https://github.com/OptimalBits/bull/blob/4517dabeb380d9cc91a61dde065c7dd1fae972dd/lib/queue.js#L782 was to to undefined

I have a similar problem. I am trying to shutdown the system but want's to let the job finalize before. I pause all queues and is expecting that the jobs would finalize before exit but the actual behavior is that the promise is returned before the jobs are finished. I looked in the source code and it looks like bclientInitialized is never set anywhere in the code.

@andreas-engman It's set there: https://github.com/OptimalBits/bull/blob/4517dabeb380d9cc91a61dde065c7dd1fae972dd/lib/queue.js#L286

But for some reason, in some cases it doesn't reach this code

I implemented @rosslavery logic and at first it appeared to be working. Unfortunately yesterday the issue popped up again...

This is my code:

await queue.pause(true).catch(error => log(LOG_TYPES.ERROR, error));
log(LOG_TYPES.INFO, 'Paused queue, exiting gracefully');
setTimeout(async () => {
  if (mongodb.getConnection()) await mongodb.getConnection().close();
  log(LOG_TYPES.INFO, 'MongoDB disconnected');
  client.quit(() => {
    log(LOG_TYPES.INFO, 'Redis client disconnected');
    process.exit(code);
  });
}, 5000);

On SIGTERM I pause the queue, and it seems to shutdown gracefully. I wait 5 seconds to be sure all completed and failed events are handled, after this the process shuts down. These are the logs:

Jan 10 23:10:02 app/worker.1: [+] Process SIGTERM
Jan 10 23:10:02 app/worker.1: [+] Paused queue, exiting gracefully
Jan 10 23:10:04 heroku/worker.1: Starting process with command "node --optimize_for_size --max_old_space_size=32 worker.js"
Jan 10 23:10:04 heroku/worker.1: State changed from starting to up
Jan 10 23:10:07 app/worker.1: [+] Starting processors...
Jan 10 23:10:07 app/worker.1: [+] Processors started.
Jan 10 23:10:08 app/worker.1: [+] MongoDB disconnected
Jan 10 23:10:08 app/worker.1: [+] Redis client disconnected
Jan 10 23:10:08 app/worker.1: [+] About to exit with code 0
Jan 10 23:10:08 heroku/worker.1: Process exited with status 0

In practice, some jobs are still running in the background, but no completed or failed events are emitted.

When the new worker starts, these jobs are picked up again and are handled the way they should. Unfortunately this causes jobs to be executed twice.

I have been able to reproduce the issue in a simplified environment. I will clean up my code and post it later on, but I have learned that the issue is only related to sandboxed processes. When using a "normal" process function, pausing the queue works beautifully.

I created a simplified environment using a recurring job that runs every minute.
I use a mocked http post function to simulate an async http call that takes 5 seconds.

Start the node process using "node index.js"

When a job starts it will log "starting job...", when its done it will log "done!".

Terminate the node process right after you see "starting job" and you will notice that "done!" is never logged.

I also implemented a normal process function (see commented code in index.js). When using this process function instead, you will notice that "done!" is logged when terminating when the job is still running.

@manast I hope this gives you enough to reproduce the issue. If not, please let me know what else I can provide.

This is my code:

package.json

{
  "name": "bull-test",
  "version": "1.0.0",
  "engines": {
    "node": "12.13.1"
  },
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "bull": "^3.10.0",
    "ioredis": "^4.11.2"
  }
}

index.js

const REDIS_URL =  process.env.REDIS_URL;

const path = require('path');
const Queue = require('bull');
const Redis = require('ioredis');
const { post } = require("./mock");

const client = new Redis(REDIS_URL);

let subscriber;

const options = {
  createClient(type) {
    switch (type) {
      case 'client':
        return client;
      case 'subscriber':
        return getOrCreateSubscriber();
      default:
        return new Redis(REDIS_URL);
    }
  },
};

const getOrCreateSubscriber = () => {
  if (!subscriber) subscriber = new Redis(REDIS_URL);
  return subscriber;
};

const queue = new Queue('tester', options);

let shuttingDown = false;

(async () => {
  try {
    queue.on('global:completed', async (jobId, result) => {
      const payload = JSON.parse(result);
      console.log("result", payload);
    });

    queue.on('global:failed', async (jobId, reason) => {
      const job = await queue.getJob(jobId);
      console.log("job", job);
    });

    const shutdown = async (code) => {
      await queue.pause(true).catch(error => log(LOG_TYPES.ERROR, error));
      console.log('Paused queue, exiting gracefully');
      setTimeout(async () => {
        client.quit(() => {
          console.log('Redis client disconnected');
          process.exit(code);
        });
      }, 5000);
    };

    process.on('SIGTERM', () => {
      console.log('Process SIGTERM');
      if (!shuttingDown) {
        shuttingDown = true;
        shutdown(0);
      }
    });

    process.on('SIGINT', () => {
      console.log('Process SIGINT');
      if (!shuttingDown) {
        shuttingDown = true;
        shutdown(0);
      }
    });

    process.on('unhandledRejection', (reason, promise) => {
      console.log('Process unhandled rejection: Promise', JSON.stringify(promise), 'reason', JSON.stringify(reason));
      shutdown(1);
    });

    process.on('uncaughtException', (error) => {
      console.log('Process uncaught exception: Error', JSON.stringify(error));
      shutdown(1);
    });

    console.log('Starting tester processors...');

    const concurrency = 1;
    const processor = path.join(__dirname, 'processor.js');

// comment this line if you want to use a normal process function
    queue.process('*', concurrency, processor);

// uncomment this line if you want to use a normal process function
//    queue.process(async function (job){
//      try {
//        console.log("starting job...");
//        await post().catch(error => console.log(error));
//        console.log("done!");
//        return job.data;
//      } catch (error) {
//        console.log("error", error);
//        throw error;
//      }
//    });

    console.log('Processors started.');
    console.log('Process', process.pid);
  } catch (error) {
    console.log("error", error);
  }
})();

processor.js

const { post } = require("./mock");

module.exports = async (job) => {
  try {
    console.log("starting job...");
    await post().catch(error => console.log(error));
    console.log("done!");
    return job.data;
  } catch (error) {
    console.log("error", error);
    throw error;
  }
};

mock.js (to simulate an outgoing async http call)

const post = () => {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve();
    }, 5000);
  });
};

module.exports = {
  post
};

@rosslavery As I am trying to find a solution to this problem and its working for you, would you mind sharing which version of bull you are using and in what way your code is different from my implementation above? It would be greatly appreciated!

I was experiencing the same issue

I was creating my queue doing:

const q = new Queue('my-queue')

I then changed it to:

const q = new Queue('my-queue', {
  redis: {
      port: 6379
      host: 'localhost',
  }
})

Now close waits for the jobs to finish processing

The issue is that

https://github.com/OptimalBits/bull/blob/4517dabeb380d9cc91a61dde065c7dd1fae972dd/lib/queue.js#L782
was to to undefined

I tried this, but without any luck. Do you use sandboxed processors?

For those who come here later, my sandboxed processors were exiting because of SIGTERM.

By adding an empty event listener in my processor file, I was able to keep my sandboxed processes alive and start a graceful shutdown:

process.on('SIGTERM', () => {
  console.log('SIGTERM processor');
});

my sandboxed processors were exiting because of SIGTERM

@oavanruiten I'm pretty sure that's the expected behaviour, you are suppose to call close in the SIGTERM listener

Read up on SIGTERM, what you are now doing is kinda wrong: once your server receive the SIGTERM, it's suppose to shutdown as soon as possible (when everything is cleaned up). Here you are just ignoring SIGTERM and your server will be killed by a SIGKILL which should only happens on exceptional occasion

I still believe this issue should be kept open (even thought I found a weird workaround for my case)

@mastilver I appreciate you sharing your thoughts.

I am no expert on running separate processes in node, but what I do is the following:

I listen to SIGTERM in my main process that initializes the sandboxed processors. In this listener I clean up using queue.pause(true), after which I exit using process.exit().

I was not handling the SIGTERM signal in the sandboxed processes, which caused them to shutdown immediately (even when it was running a job).

Adding the empty listener caused my sandboxed processes not to shutdown. Now I was able to wait for all jobs to finish before exiting in the main process. When this process shuts down, all sandboxed processed are killed as well.

I know this is not the best practice, but it does fix my issue...

I would love to hear other opinions on how to handle this better.

@oavanruiten Gotcha, I didn't understand what you meat by sandboxed processes

What you are doing make now sense to me

Was this page helpful?
0 / 5 - 0 ratings