Celery: how to create per worker process variable

Created on 8 May 2017  路  1Comment  路  Source: celery/celery

Checklist

  • [ ] I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • [x] I have verified that the issue exists against the master branch of Celery.

Steps to reproduce

This problem comes from producer.flush make celery hang.

I paste the code in the following:

from celery import Celery
from kafka import KafkaProducer

app = Celery('test', broker='redis://127.0.0.1:6379/0')

producer = KafkaProducer(bootstrap_servers=['172.16.24.45:9092', '172.16.24.44:9092'])

@app.task
def send_msg():
    # producer = KafkaProducer(bootstrap_servers=['172.16.24.45:9092', '172.16.24.44:9092'])
    for i in range(10):
        producer.send('test', b'this is the %dth test message' % i)
    producer.flush()


if __name__ == '__main__':
        app.start()

I want to create producer variable per worker process, and I think worker_process_init signal will help.

But I don't know how to declare producer variable per worker process which will be then used in task func.

Can someone help me? Thanks.

>All comments

check the celery docs in detail. also I kafka is not supported now

Was this page helpful?
0 / 5 - 0 ratings