Bull: How to remove repeatable jobs?

Created on 21 Jul 2018  ·  27Comments  ·  Source: OptimalBits/bull

Description

Calling queue.empty() will not remove the repeatable job. I made a repeatable job at cron 1 minute interval, then tried upping it to 5 minutes. The result was that I had two cron jobs: one at the 1 minute interval, the other at the 5 minute interval. I would like to know if there is an API call to remove a repeat/cron so that I can essentially update/change the cron for a job vs having multiple crons for a job.

Minimal, Working Test code to reproduce the issue.

paymentsQueue.process(function(job){
  // Check payments
});

// Repeat payment job once every day at 3:15 (am)
paymentsQueue.add(paymentsData, {repeat: {cron: '*/1 * * * *'}});
// does not clear the one minute
paymentsQueue.empty()
paymentsQueue.add(paymentsData, {repeat: {cron: '*/5 * * * *'}});

Now I have two repeatable jobs, but I only want one.

Bull version

3.4.3

Additional information

None.

question

Most helpful comment

@eltronix Thank you updating the version fixed the issue :)

@manast I am able to remove the repeatable job now. The problem was i am adding the job to the queue with:

let jobOptions = {
        priority: queue_options.priority,
        repeat: { cron: '0 47 6 * * 4' },
        attempts: 3,
        removeOnComplete: true,
        jobId: queue_options.job_id,
        backoff: 3,
        timeout: 60000,
      };
      taskQueue.add('task', queue_options.data, jobOptions);

And trying to remove the job like:

let job = await taskQueue.removeRepeatable('task', {cron : '0 47 6 * * 4'});

I fixed it by passing the jobOptions and i was able to successfully remove the job from the queue.

let job = await taskQueue.removeRepeatable('task' jobOptions);

IMO why should i even pass the entire job options to remove the job from the queue. Should'nt jobId and cron be enough to remove it? It will be tough to pass jobOptions that was used while creating the job to the function that is deleting the job from the queue.

Life would be simpler if you could provide us a method where we can just delete the repeatable jobs too with

queue.removeJob(customJobId);

All 27 comments

I'm not sure how I missed that option :O Thanks.

You can also use the undocumented .removeRepeatableByKey(key) to remove a job by its key

removeRepeatableByKey

O.o Where is this located. This might be able to solve my problem. How would I get the key? Would it be the same as the JobId or is this something entirely different?

@stevenolay you could get the key by for example calling getRepeatableJobs

@manast Well, using getRepeatableJobs would not be good for me because I am expecting on the order of thousands of repeatable jobs. Iterating across those in memory would be less than ideal.

@manast I took a look at the payload for getRepeatableJobs and looked though the helper functions for repeat jobs in lib/repeatable.js and was able to figure out how to construct jobKeys dynamically.

One thing I noticed is even if the jobs are removed and getRepeatableJobs returns an empty list. The jobs still appear in redis if i call "keys *" in the redis-cli. Would you have any insights on this?

getRepeatableJobs returns only the canonical jobs so to speak, the real jobs that have been processed will stay in the queue unless removed.

Hey @manast, could you kindly provide an example usage of:
https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueremoverepeatable

I can't get it to work for me even though I'm resuing the same options object.

.removeRepeatableByKey(key) seems to work fine.

When i tried using this method - removeRepeatableByKey I am getting removeRepeatableByKey is not a function error.
queue_1.taskQueue.removeRepeatableByKey is not a function

I am not able to remove my repeatable job by taskQueue.removeRepeatable('task', { cron: '0 47 6 * * 4' }); this too

JSON of the JOB:
{ id: "repeat:09854c8042eced1337a7d8eec9357528:1552526220000", name: "task", data: { eventName: "test", parameters: [ { my_JSON }, { my_JSON } ] }, opts: { repeat: { count: 1, cron: "0 47 6 * * 4", jobId: "myJobId" }, jobId: "repeat:09854c8042eced1337a7d8eec9357528:1552526220000", delay: 603096068, timestamp: 1551923123932, prevMillis: 1552526220000, priority: 1, attempts: 3, removeOnComplete: true, backoff: { type: "fixed", delay: 3 }, timeout: 60000 }, progress: 0, delay: 603096068, timestamp: 1551923123932, attemptsMade: 0, stacktrace: [ ], returnvalue: null, finishedOn: null, processedOn: null }

@ganeshcse2991 if you provide a code snipped that reproduces the error we can help you much faster.

@manast sure the following is my code:

export const addTaskCard = function (queue_options) {
  logger.info('Inside Task Queue Addition Block');
  try {
    if (!isNullOrUndefined(queue_options)) {
      let jobOptions = {
        priority: queue_options.priority,
        repeat: { cron: '0 47 6 * * 4' },
        attempts: 3,
        removeOnComplete: true,
        jobId: queue_options.job_id,
        backoff: 3,
        timeout: 60000,
      };
      taskQueue.add('task', queue_options.data, jobOptions);
    }
  }
  catch (error) {
    logger.error(error)
    logger.error("Job Not added to the queue")
  }
};

let processTaskQueue = function () {
  logger.info('***********************Processing Task Queue***********************');
  taskExecutor.process('task', 100, function (job, done) {
    eventEmitter.emit(job.data.eventName, job.data.parameters);
    done();
  });

  taskExecutor.on('completed', function (job, result) {
    logger.info('Task Created for job: ' + job.id + ' and result is: ' + result);
  });

  taskExecutor.on('error', function (job, result) {
    logger.error('Error Occured while processing the Job in the Queue');
    logger.error(result);
  });
};

export async function removeJob(jobId, cron) {
  console.log("Inside get job")
  console.log(jobId);
  console.log(cron)
  try {
    let job = await taskQueue.removeRepeatable('task', {cron : '0 47 6 * * 4'});
    //let jobs = await taskQueue.getRepeatableJobs();
    console.log(job)
    //let job = await taskQueue.removeRepeatableByKey(jobs[0].key);

  } catch (error) {
    //console.log(error);
  }
}

processTaskQueue();

I tried both the moethods removeRepeatableByKey and removeRepeatable. removeRepeatableByKey is giving me an error as 'is not a function'.

queue_1.taskQueue.removeRepeatableByKey is not a function

How I create my task queue:

export const taskQueue = new Queue('task', appConfig[process.env.NODE_ENV].redis);
export const taskExecutor = new Queue('task', appConfig[process.env.NODE_ENV].redis);

@eltronix Can you give and example of removeRepeatableByKey. Its throwing me an error as

queue_1.taskQueue.removeRepeatableByKey is not a function

@ganeshcse2991 I currently use this to remove all my repeatable jobs. queues is an object with all my queues.
```
const repeatableJobs = await queues.syncCommitsQueue.getRepeatableJobs();

if (repeatableJobs.length > 0) {
console.log('Current repeatable configs: ', repeatableJobs);

repeatableJobs.forEach(async job => {
  await queues.syncCommitsQueue.removeRepeatableByKey(job.key);
});

}

@eltronix taskQueue.removeRepeatableByKey(jobs[0].key); this is how I am also trying. Its saying removeRepeatableByKey is not a function

@ganeshcse2991 what version are you using?

@eltronix version - "bull": "^3.4.4",

@eltronix https://github.com/OptimalBits/bull/blob/develop/test/test_repeat.js#L362

@manast That's for a named repeatable job. What if it isn't named, would it still work?

@ganeshcse2991 I'm using "bull": "^3.7.0"
Try upgrading to that.

You can also try using removeRepeatable with a named repeatable job as demonstrated here:
https://github.com/OptimalBits/bull/blob/develop/test/test_repeat.js#L362

@eltronix Thank you updating the version fixed the issue :)

@manast I am able to remove the repeatable job now. The problem was i am adding the job to the queue with:

let jobOptions = {
        priority: queue_options.priority,
        repeat: { cron: '0 47 6 * * 4' },
        attempts: 3,
        removeOnComplete: true,
        jobId: queue_options.job_id,
        backoff: 3,
        timeout: 60000,
      };
      taskQueue.add('task', queue_options.data, jobOptions);

And trying to remove the job like:

let job = await taskQueue.removeRepeatable('task', {cron : '0 47 6 * * 4'});

I fixed it by passing the jobOptions and i was able to successfully remove the job from the queue.

let job = await taskQueue.removeRepeatable('task' jobOptions);

IMO why should i even pass the entire job options to remove the job from the queue. Should'nt jobId and cron be enough to remove it? It will be tough to pass jobOptions that was used while creating the job to the function that is deleting the job from the queue.

Life would be simpler if you could provide us a method where we can just delete the repeatable jobs too with

queue.removeJob(customJobId);

@ganeshcse2991 ganeshcse2991.

If you are using TypeScript it’s important to note the type definition for removeRepeatableByKey is not exposed. You can get around this by doing:

taskQueue[“removeRepeatableByKey”] (thisIsMyJobKeyVar)

I did it with:

queue.removeRepeatable('nameOfProcess', { cron: cronValueUsedOnAdd });

The second parameter is a "repeat options" and not a "job options".

Guys I have a problems with removing a repeatable jobs.

I want replace the previous job in new one. For this I remove the job like:

export const deleteReminder = async () => {
  const repeatableJobs = await needsQueue.getRepeatableJobs();

  repeatableJobs.forEach(async job => await needsQueue.removeRepeatableByKey(job.key));
};

As I see on the console, it works. But when I check the redis I see that records wasn't remove. Is it command also removes the job from the Redis? Cause I still see that old jobs are called =|

@KonstantinKudelko https://github.com/OptimalBits/bull/issues/1277#issuecomment-534939664

Example —

  public removeAllRepeatable = async (): Promise<void> => {
    const queue = await this.queue;
    const jobs = await queue.getRepeatableJobs();
    for (let i = 0; i < jobs.length; i++) {
      const job = jobs[i];
      await queue.removeRepeatable({ cron: job.cron, jobId: job.id });
    }
  };

@KonstantinKudelko #1277 (comment)

Example —

  public removeAllRepeatable = async (): Promise<void> => {
    const queue = await this.queue;
    const jobs = await queue.getRepeatableJobs();
    for (let i = 0; i < jobs.length; i++) {
      const job = jobs[i];
      await queue.removeRepeatable({ cron: job.cron, jobId: job.id });
    }
  };

@stansv

will this kill active jobs aswell?
i.e:
Lets say i have a repeated job that scheduled to run every 5 minutes, i than i want to change it to every 10 minutes. And lets say that at time of deployment, there is an active repeated job X (of the 5 minute type)
and on deployment i remove all existing repeated jobs of that queue (as your code suggests)
will this kill the active job X also?
how can i make sure that active jobs are awaited to complete (and actually get them executed too) before i kill the repeated jobs?

Been trying to delete the repeatable job, but unable to do so. Tried the various suggestion from this thread.

await workerPrb.removeRepeatable({ repeat: repeatOpts, jobId: job.id })

Was this page helpful?
0 / 5 - 0 ratings

Related issues

DevBrent picture DevBrent  ·  4Comments

joe-at-startupmedia picture joe-at-startupmedia  ·  3Comments

sarneeh picture sarneeh  ·  3Comments

rodrigoords picture rodrigoords  ·  4Comments

PhillippOhlandt picture PhillippOhlandt  ·  4Comments