Bull: Questions about Queue.process()

Created on 11 Feb 2018  路  5Comments  路  Source: OptimalBits/bull

Q1: why Queue.process() return a promise, and when will it resolve/reject ?

My code:

$Q.process(typeToQueueId(type, false), 1, async (job) => {
    log('queue %s process: %s', type, job.id);
    await handler(await getJobDocument(job));
    log('queue %s complete: %s', type, job.id);
}).catch((e) => { /// <<-- is this requried??
    logErr('process function rejected ??!! ', e);
});

*Q2: Also, from source code, .process() will always return a promise. But .d.ts says it will return different when pass different argument:

process(callback: (job: Job) => void): Promise;
process(callback: (job: Job, done: DoneCallback) => void): void;

Is this a mistake or ?

question

All 5 comments

Q3: (maybe bug?)
as code above, I have many (unknow number of) queue, I want each queue can only process single job at same time.
So I set the secound param concurrency to 1, but my job still process concurrentlly.

I got logs like:

queue 1 process: 100
queue 1 process: 101
queue 1 process: 102
queue 1 complete: 101
queue 1 complete: 102
queue 1 complete: 100

Why...

Q4: When I start N same servers. each one can push / process job with same way. And only one redis server. Is my queues only run single job at same time. or run N jobs ?

Q1: the promise resolves when the queue is closed.
Q2: we do not maintain the types, so any difference between this library and the types must be reported in the types repo.
Q3: Without code to reproduce we cannot know why you get that result.
Q4: If you have N servers with concurrency 1, you will at most process N jobs at the same time.

Really thanks for your reply.

Sorry for my bad english, my code of Q3 is same with Q1.

@GongT I do not know why you get the result with the example code but check this out:

const Queue = require("bull");

const $Q = new Queue("my test");

$Q.process(1, async job => {
  console.log("process", job.data);
  await wait();
  console.log("complete", job.data);

  if(job.data.i === 9){
    $Q.close();
  }
})
  .catch(e => {
    /// <<-- is this requried??
    logErr("process function rejected ??!! ", e);
  })
  .then(() => {
    console.log("Done processing jobs");
  });

 function start() {
  console.log("Adding jobs");
  for (let i = 0; i < 10; i++) {
    $Q.add({ foo: "bar", i });
  }
  console.log("done adding");
}

async function wait() {
  return new Promise(resolve => setTimeout(resolve, 1000));
}

start();

Result:

Adding jobs
done adding
process { foo: 'bar', i: 0 }
complete { foo: 'bar', i: 0 }
process { foo: 'bar', i: 1 }
complete { foo: 'bar', i: 1 }
process { foo: 'bar', i: 2 }
complete { foo: 'bar', i: 2 }
process { foo: 'bar', i: 3 }
complete { foo: 'bar', i: 3 }
process { foo: 'bar', i: 4 }
complete { foo: 'bar', i: 4 }
process { foo: 'bar', i: 5 }
complete { foo: 'bar', i: 5 }
process { foo: 'bar', i: 6 }
complete { foo: 'bar', i: 6 }
process { foo: 'bar', i: 7 }
complete { foo: 'bar', i: 7 }
process { foo: 'bar', i: 8 }
complete { foo: 'bar', i: 8 }
process { foo: 'bar', i: 9 }
complete { foo: 'bar', i: 9 }
Done processing jobs
Was this page helpful?
0 / 5 - 0 ratings

Related issues

pintocarlos picture pintocarlos  路  3Comments

JSRossiter picture JSRossiter  路  3Comments

btd picture btd  路  3Comments

thelinuxlich picture thelinuxlich  路  3Comments

joe-at-startupmedia picture joe-at-startupmedia  路  3Comments