celery -A proj report in the issue.master branch of Celery.This problem comes from producer.flush make celery hang.
I paste the code in the following:
from celery import Celery
from kafka import KafkaProducer
app = Celery('test', broker='redis://127.0.0.1:6379/0')
producer = KafkaProducer(bootstrap_servers=['172.16.24.45:9092', '172.16.24.44:9092'])
@app.task
def send_msg():
# producer = KafkaProducer(bootstrap_servers=['172.16.24.45:9092', '172.16.24.44:9092'])
for i in range(10):
producer.send('test', b'this is the %dth test message' % i)
producer.flush()
if __name__ == '__main__':
app.start()
I want to create producer variable per worker process, and I think worker_process_init signal will help.
But I don't know how to declare producer variable per worker process which will be then used in task func.
Can someone help me? Thanks.
check the celery docs in detail. also I kafka is not supported now