Rq: Programmatically create more workers at run-time

Created on 7 Apr 2016  路  4Comments  路  Source: rq/rq

Is there a way to programmatically create more workers at run-time? When the work finishes the job, it simply dies. In this way, I can minimize the amount of jobs waiting in the queue.

Most helpful comment

@shivekkhurana looks correct.

I prefer to use for loops to perform actions (like process creation) and list comprehensions for data structures. So for this case I suggest you to use for loop.

All 4 comments

Yes you can. But because you want the worker to die, you'll have to start

  • an all-time running worker using the rq worker command and
  • on demand workers by saving the following code in some file, say on_demand_workers.py and then calling $ python on_demand_workers.py

Both of the command should ideally be run under a task monitor (supervisor or circus)

#!/usr/bin/python3

import sys
import time
from rq import Connection, Worker
from redis import Redis

redis = Redis(host='localhost')

def need_burst_workers():
    # check database or redis key to determine whether burst worker army is required
    return True #boolean

def num_burst_workers_needed():
    # check the number, maybe divide the number of pending tasks by n
    return 10 #integer

def main(qs):
    with Connection(connection=redis):
        if need_burst_workers():
            [Worker(qs).work(burst=True) for i in range(num_burst_workers_needed())]
        else:
            time.sleep(10) #in seconds

if __name__ == '__main__':
    qs = sys.argv[1:] or ['default']
    main(qs)

Hope this helps. Thank you.

Worker.work is blocking call. This code creates ten workers sequentially. Wrap work method call into multiprocessing at least.

@proofit404 : I didn't realize this while answering. Thanks for pointing this out.

The updated __name__ == '__main__' block is :

import multiprocessing

def main(qs):
    with Connection(connection=redis):
        if need_burst_workers():
            [multiprocessing.Process(target=Worker(qs).work, kwargs={'burst': True}).start() for i in range(num_burst_workers_needed())]
        else:
            time.sleep(10) #in seconds

Update 0 :
Using for loop instead of a list comprehension, as suggested by @proofit404 :

import multiprocessing

def main(qs):
    with Connection(connection=redis):
        if need_burst_workers():
            for i in range(num_burst_workers_needed()):
                multiprocessing.Process(target=Worker(qs).work, kwargs={'burst': True}).start()
        else:
            time.sleep(10) #in seconds

Source : Stack Overflow

@shivekkhurana looks correct.

I prefer to use for loops to perform actions (like process creation) and list comprehensions for data structures. So for this case I suggest you to use for loop.

Was this page helpful?
0 / 5 - 0 ratings