Celery: How to use AsyncResult.then in async/await

Created on 30 Oct 2017  路  1Comment  路  Source: celery/celery

from gevent import monkey

monkey.patch_all()
import asyncio

import time
from celery import Celery
from celery.result import AsyncResult

app = Celery(broker='amqp://xxx', backend='redis://:xxx')

app.config_from_object('config')


@app.task(name='tasks.add')
def add(x, y):
    return x + y


def on_result_ready(result: AsyncResult):
    print('Received result for id %r: %r' % (result.id, result.result,))
    result.forget()
    print("forget end")


if __name__ == "__main__":
    for _ in range(10):
        async def test(my_nym):
            res: AsyncResult = add.apply_async(args=[my_nym, my_nym])
            res.then(on_result_ready)


        asyncio.ensure_future(test(1))
        asyncio.ensure_future(test(2))
        asyncio.ensure_future(test(3))
        asyncio.ensure_future(test(4))

    import asyncio

    asyncio.get_event_loop().run_forever()

I have tried this code in my code.It works fine.
But,it seems it's only working after i import gevent.
I'm coding with python3.6 async/await is very nice for me ,The gevent patch change the queue to non-block make many error,
So what can i do.
change gevent patch?

__all__ = [
    'patch_all',
    'patch_builtins',
    'patch_dns',
    'patch_os',
    'patch_select',
    'patch_signal',
    'patch_socket',
    'patch_ssl',
    'patch_subprocess',
    'patch_sys',
    'patch_thread',
    'patch_time',
    # query functions
    'get_original',
    'is_module_patched',
    'is_object_patched',
    # module functions
    'main',
]

or what can i do
change my code to queue.get_nowait may is not suitable for me.

from gevent import monkey
monkey.patch_all()
Question

>All comments

Currently Celery doesn't support the asyncio library.
We plan to change that in Celery 5.0 but we haven't found the time to implement it.

Was this page helpful?
0 / 5 - 0 ratings