Bull: Question: Reusing Queue instances vs. Reusing Redis Connections

Created on 4 Apr 2019  路  8Comments  路  Source: OptimalBits/bull

My Use Case
My use case is that consumers produce many jobs in sequence inside the process function.

More specifically, I have process functions for a 1sec repeated job that each one adds around 100 jobs to around 10 queues. And each job in the 10 queues, adds between 1 to 1000 jobs to those different 10 queues when it is processed.
So my application, of course starts with initiation of 10 Queue objects and by calling their process function, given concurrency limit that I will fine tune.

My Question
Now, I have those 10 Queue objects, shall I reuse them inside the process functions for adding jobs to the same queues?

What is the best practice to reuse Queue objects / intances or redis connections?

  1. Is it to make an in memory cache by queue name for the Queue object?

    1. If so, shall I use different group of Queue object group for processing and for adding jobs.

  2. Or, is the Queue instance cheap and I better reuse redis connections with createClient options mentioned in documentation? Any concurrency issue with the redis connections when doing this?
question

Most helpful comment

@nitzanav Yes, your JobQueueApi.jobQueue(queueName) function is serving as a type of QueueProvider

@ashkank83 My actual implementations of these concepts are quite a bit more involved than would be useful to post here, but I've included the general idea below (not tested, but tsc is happy with it 馃槃)

import Bull, { Queue, QueueOptions, Job, DoneCallback } from 'bull'

/** 
 * Queue provider
 * 
 * A default implementation automatically creates a queue and adds it to the
 * internal lookup hash if one doesn't already exist with the same name.
 */
export interface IQueueProvider {
  getQueue: (name: string, config?: QueueOptions) => Queue
}
export class DefaultQueueProvider implements IQueueProvider {
  static Queues: { [key: string]: Queue } = {}

  getQueue = (name: string, config?: QueueOptions): Queue => {
    let queue = DefaultQueueProvider.Queues[name]
    if (queue) {
      return queue
    }
    queue = new Bull(name, config)
    DefaultQueueProvider.Queues[name] = queue
    return queue
  }
}

/** 
 * Job processor
 * 
 * `BaseJobProcessor` exists so that all sub-classes have access to an `IQueueProvider`.
 *  Its constructor defaults to an instance of the `DefaultQueueProvider`
 * 
 * `SampleJobProcessor` is a minimal example showing how you'd reuse a queue instance
 * inside of a processor.
 */
export interface IJobProcessor {
  process: (job: Job, done: DoneCallback) => void
}
export class BaseJobProcessor implements IJobProcessor {
  private _queueProvider: IQueueProvider
  get queueProvider(): IQueueProvider {
    return this._queueProvider
  }

  constructor(queueProvider: IQueueProvider = new DefaultQueueProvider()) {
    this._queueProvider = queueProvider
  }

  process = (job: Job, done: DoneCallback): void => {
    throw new Error('BaseJobProcessor.process is abstract')
  }
}
class SampleJobProcessor extends BaseJobProcessor {
  process = (job: Job, done: DoneCallback): void => {
    const queueB = this.queueProvider.getQueue('QueueB')
    // Process the job, add jobs to queueB, etc
    done()
  }
}

All 8 comments

@nitzanav

  1. Queue instances are cheap, but a cache in the form of a QueueProvider abstraction may come in handy. It would let you request a Queue by name that is automatically configured if it doesn't already exist in its internal cache. That's what I do and it comes in handy, especially in testing. I'd recommend creating a Processor object that has a process function. This would make it easy for you to pass in references to any dependencies your processor needs, like the QueueProvider. This design will also make things much easier if you switch to the path based processor registration method should you need to scale your processors.

  2. No need to have different groups.

  3. I'd recommend reusing your Redis connections, and there should not be any concurrency issues if you implement the reuse as described in the docs.

@nitzanav @gcox
Is it possible to share your QueueProvider and Processor code please? I'm very interested to see how others are doing this and learn some best practices.

You can see my queue manager file here

I believe it's missing the processor part that you have described.

Edit: Sorry, I made a mistake in addressing! @gcox : Is it possible to share your code please?

@gcox Thank you very much for the detailed answer!
@ashkank83 thank for the feedback and I will gladly share my code.

Well my QueueProvider code is a draft that was just added and wasn't tested, it might have syntax issues or some bug. The reuse of redis connections was not added to the code just yet.

So I have a few layers of abstractions:

  • JobManager.process(jobData) - JobManager routes for different processors, and wraps them with error handling and logging.
  • JobQueueApi which is all the API that is an abstraction for the queue choice. it uses JobQueueApiBull, but I also have JobQueueApiFake and can have in future RabbitMQ implementation of the same interface.
  • The JobQueueApi.jobQueue(queueName) function give the functionality of QueueProvider in a way. @gcox is that what you meant? @ashkank83 what is you opinion on this?
  • you might see a

Here is the code of these modules:
JobQueueApi

const JobQueueApiConcrete = require('./job_queue_api_bull')
const queues = {}
const processor = (job) => JobManager.process(job.data)

module.exports = class JobQueueApi {
    static find(queueName, jobId) {
        return this.jobQueue(queueName).find(jobId)
    }

    static async queue(queueName, jobData) {
        return this.jobQueue(queueName).queue(jobData)
    }

    static jobQueue(queueName) {
        if (!queues[queueName]) queues[queueName] = new JobQueueApiConcrete(queueName, processor)

        return queues[queueName]
    }

    static process(queue){
        this.jobQueue(queue.name).process(queue)
    }

    static empty(queueName){
        return this.jobQueue(queueName).empty()
    }

    static gracefullShutdown(queueName){
        return this.jobQueue(queueName).gracefullShutdown()
    }

}

JobQueueApiBull

const _ = require('lodash')
const Queue = require('bull')
const logger = require('../../winston')

const {redis} = require('config')
module.exports = class JobQueueApiBull {

    constructor(queueName, processor) {
        this.queueName = queueName
        this.bullQueue = new Queue(queueName, {redis})
        this.processor = processor
    }

    async queue(jobData) {
        return this.bullQueue.add(jobData.jobId, jobData)
            .then(job => logger.info({status: 'queued', jobId: jobData.jobId, type: jobData.type}))
    }

    async find(jobId) {
        return this.bullQueue.getJob(jobId)
    }

    process(queue){
        this.bullQueue.process("*", queue.concurrency, this.processor)
    }

    empty(){
        return this.bullQueue.empty()
    }

    async gracefullShutdown(){
        // Pausing will stop the queue worker from picking up any new jobs
        // but continue working any in-flight jobs. The `true` arg pauses
        // only this instance of the queue, not globally (another queue worker
        // might be running elsewhere and it hasn't been asked to terminate, so
        // we don't need to interrupt it.)
        await this.bullQueue.pause(true)
        await this.bullQueue.close()
    }

}

JobQueueApiFake

const config = require('config')

module.exports = class JobQueueApiFake {

    constructor(queueName, processor) {
        this.queueName = queueName
        this.processor = processor
        this.jobs = {}
    }

    async queue(jobData) {
        this.jobs[jobData.jobId] = jobData

        if (config.workers.default.queues.find(q => (q.name == this.queueName)).to_process) {
            try {
                await this.processor({data: jobData})
            } catch (err) {
                console.log(err)
            }
        } /* same logic will be in bull */
    }

    async find(jobId) {
        return { ...this.jobs[jobId], jobId }
    }

    static instance(queueName) {
        return new JobQueueApiFake(queueName)
    }

    process(){
    }

    empty(){
    }

    gracefullShutdown(){
    }

}
const JobManager = require('../core/job_manager')

JobManager

const logger = require('../../winston')

module.exports = class JobManager {
    static type(jobData) {
        return JOBS[jobData.type]
    }

    static instanciateJob(jobData) {
        return new (this.type(jobData))({ data: jobData })
    }

    // JobManager doesn't know what is automation, this is why it cannot do those things itself
    static async process(jobData) {
        logger.info({status: 'processing', jobId: jobData.jobId, type: jobData.type})

        return this.instanciateJob(jobData)
            .process()
            .then(response => {
                logger.info({status: 'processed', jobId: jobData.jobId, type: jobData.type})
                return response
            })
            .catch(err => {
                logger.info({status: 'failed', jobId: jobData.jobId, type: jobData.type})
                console.error(err, err.stack);
                throw err
            })
    }
    static async queueOrDo(jobData) {
        return this.instanciateJob(jobData).queueOrDo()
    }

}

// for avoiding of ciclic dependency
const JOBS = require('./job_types')

@nitzanav Yes, your JobQueueApi.jobQueue(queueName) function is serving as a type of QueueProvider

@ashkank83 My actual implementations of these concepts are quite a bit more involved than would be useful to post here, but I've included the general idea below (not tested, but tsc is happy with it 馃槃)

import Bull, { Queue, QueueOptions, Job, DoneCallback } from 'bull'

/** 
 * Queue provider
 * 
 * A default implementation automatically creates a queue and adds it to the
 * internal lookup hash if one doesn't already exist with the same name.
 */
export interface IQueueProvider {
  getQueue: (name: string, config?: QueueOptions) => Queue
}
export class DefaultQueueProvider implements IQueueProvider {
  static Queues: { [key: string]: Queue } = {}

  getQueue = (name: string, config?: QueueOptions): Queue => {
    let queue = DefaultQueueProvider.Queues[name]
    if (queue) {
      return queue
    }
    queue = new Bull(name, config)
    DefaultQueueProvider.Queues[name] = queue
    return queue
  }
}

/** 
 * Job processor
 * 
 * `BaseJobProcessor` exists so that all sub-classes have access to an `IQueueProvider`.
 *  Its constructor defaults to an instance of the `DefaultQueueProvider`
 * 
 * `SampleJobProcessor` is a minimal example showing how you'd reuse a queue instance
 * inside of a processor.
 */
export interface IJobProcessor {
  process: (job: Job, done: DoneCallback) => void
}
export class BaseJobProcessor implements IJobProcessor {
  private _queueProvider: IQueueProvider
  get queueProvider(): IQueueProvider {
    return this._queueProvider
  }

  constructor(queueProvider: IQueueProvider = new DefaultQueueProvider()) {
    this._queueProvider = queueProvider
  }

  process = (job: Job, done: DoneCallback): void => {
    throw new Error('BaseJobProcessor.process is abstract')
  }
}
class SampleJobProcessor extends BaseJobProcessor {
  process = (job: Job, done: DoneCallback): void => {
    const queueB = this.queueProvider.getQueue('QueueB')
    // Process the job, add jobs to queueB, etc
    done()
  }
}

@gcox
Thanks for sharing.
Is there anyway to use queueProvider when the job is running inside a sandbox?
At the moment, I have to create new instance of the queue inside my sand box process add a new job to it and close it after which seems to be a waste.

export default async function(job) {
  //is there any way to use the Queue provider here? 
  let queue = new Queue(job.data.processQueue);
  queue.add('my new job');
  queue.close()
}

@ashkank83
You can use the QueueProvider but there would be a different internal cache of Queue instances per sandboxed node process. That's okay, the sandboxed processes are long-lived, so caching things in a sandboxed process is good.

For sandboxed processors, I use another file to create the processor instance for the purposes of injecting dependencies like the QueueProvider.

General idea:

// myProcessorSandbox.js
import { MyProcessor } from './processor'
import { DefaultQueueProvider } from './defaultQueueProvider'

export default new MyProcessor(
  new DefaultQueueProvider(),
  /* any other dependencies */
).process

// Wherever you register your processors
queue.process(?, ?, /* path to myProcessorSandbox.js */)

That gives your processor a QueueProvider instance, allows your processors to remain unchanged whether they are sandboxed or not, and also helps with testing.

@gcox Thanks again, I like the idea.

Given we can't use the main queue cache, the only question is, would it be cheaper to keep 3 connections to N dependant queues open per Sandbox job
or is it better to open the required queue everytime and close it afterwards.

@ashkank83 That's entirely dependent on your specific system implementation (total concurrency count, job processing frequency, whether you're running your own Redis server or using a cloud host with connection/effective limitations, etc).

Was this page helpful?
0 / 5 - 0 ratings