Aiohttp: Allow to register application background tasks within an event loop

Created on 17 Aug 2016  路  9Comments  路  Source: aio-libs/aiohttp

Long story short

I run an aiohttp application with Gunicorn behind nginx.
The application exposes two websocket connection endpoints for mobile clients: one for RPC and other for Pub/Sub.

When some event is occurred in the system (e,g, there're some news from company for clients) I want to notify all the connected websockets that I store e.g. in my app['pub_sub_websockets']. So basically I would iterate them in a loop and send bytes to each one.

Each application instance that Gunicorn runs in a separate process has own collection of connected websockets. But I have to notify all the websockets connected to all workers (or even to all workers on all server nodes). Therefore I would proxy the event through some messaging system like Redis Pub/Sub or ZeroMQ. In my case it's ZeroMQ.

The problem is I can't easily setup the "listener" coroutine that will run within the application's event loop to subscribe to ZeroMQ proxy and forward messages to connected websockets.

Therefore I feel like I have to create own ZeroMQ socket zmq.SUB for each of connected websockets inside my websocket request handler and there gather two coroutines - the first will be listening on ws and the second one will be listening on ZeroMQ's socket. So that the amount of ZeroMQ's zmq.SUB sockets will grow linearly depending to the amount of clients connected to the application process via websockets.

Even with this approach I can't tell the application (actually GunicornWebWorker) to use zmq.asyncio.ZMQEventLoop that's required to use with zmq.asyncio.

Expected behaviour

1) I propose to provide an interface from the web.Application like Application.register_background_task(coro) that will allow to register as many tasks as needed during the application instance setup to run along with GunicornWebWorker._runner within the event loop the worker creates, say using asyncio.gather().

In this way in my particular case I'd create only one ZeroMQ socket to listen for a topic that will run within the event loop instead of one ZMQ socket per one connected WebSocket.

And if I have to listen more event sources (message queues from different providers, etc.) I'll add one more socket (or other consuming object depending to the event source provider) and not one more thousand sockets for each thousand of currently connected websockets.

2) Also I think there should be some API to choose which event loop class to use inside the GunicornWebWorker.init_process().
Say if I understand the ZMQ's documentation correctly I must use zmq.asyncio.ZMQEventLoop to deal with ZeroMQ in my asynchronous application but I can't set this type of loop in the worker without creating of own worker class inherited from GunicornWebWorker.

Maybe the loop creation logic should be separated in own public method that could be overridden or there should be just an option to choose a class of the event loop to use within the worker. By the way maybe then we won't need a separate GunicornUVLoopWebWorker class if we'll be able to set an event loop class somewhere?

Please tell me if it's possible to introduce the stuff described above in the aiohttp library? Maybe it's just my architectural approach is totally wrong itself? Then I'd be glad to hear some good guidance to correct it.

outdated

Most helpful comment

All work is done.
@f0t0n thank you again

All 9 comments

From my point of view you are requested for on_startup signal in Application.

It should be pretty close to already existing .on_shutdown and .on_cleanup signals (see Graceful Shutdown doc section.

If you want to contribute the new signal -- you are welcome. I really appreciate it.
The change is trivial from my perspective but unfortunately it's not number one priority, so please don't wait for me but create a Pull Request. I'll help you by code review.

Regarding to zmq.asyncio.ZMQEventLoop.
It's based on zmq.Poller which in turn is based on zeromq C++ implementation.
It's not bad but poller is not built on top of epoll syscall but poll for non-zmq sockets IIRC.
It means the poller is super fast for zmq sockets but has performance problem with many opened regular file descriptors (read it a the poller is bad for supporting many parallel http connections and especially web-sockets).
aiozmq has _loop-less_ implementation which works with epoll in native way (used by default). BTW it allows you to run zmq sockets with uvloop.
I'd love to see _loop-less_ support in zmq.asyncio but personally now I have no motivation to contribute the change.
I've used zmq for a while, that's why @popravich and me have created aiozmq library. But now we don't use it for our daily job.
But if anybody will raise an issue for zmq project I can point on required operations (it's really tricky) and participate in reviewing. But, sorry, I don't want to write code/tests/docs for solving the problem.

@asvetlov Thanks for such thoughtful answer.

I've started to check the signals and even to write some code. But if we gather the app.startup() with the request handler (see a code snippet below) within web.run_app() (or within aiohttp.worker.GunicornWebWorker), all the on_startup signal handlers will run consecutively in a for loop.
In this way we can't add a few tasks that will run independently along with the request handler in the event loop. Say if one on_startup task is still listening on ZMQ socket, the other one can't start to listen something else, say Redis or whatever. How do you think we should deal with this?

# inside web.run_app() function:
server = loop.create_server(handler, host, port,
                            ssl=ssl_context, backlog=backlog)

loop.run_until_complete(asyncio.gather(server, app.startup()))

UPDATE

By the way can we use nested asyncio.gather()? For example:

def some_stuff_to_run_first():
    pass


async def listen_zmq(app):
    pass


async def listen_redis(app):
    pass


async def listen_for_notifications(app):
    return await asyncio.gather(listen_zmq(app), listen_redis(app), loop=app.loop)


app.on_startup.append(some_stuff_to_run_first)
app.on_startup.append(listen_for_notifications)

And then it will be called in the web.run_app() and in the gunicorn worker as

loop.run_until_complete(asyncio.gather(server, app.startup()))

In this way we'll run everything that need to complete first in that for loop inside Signal and after that will run long running tasks that will leave till application is alive or e.g. till their sockets will receive a termination message.

@asvetlov I've started to work on this but got problems with tests I don't get how to solve (see: https://github.com/f0t0n/aiohttp/pull/1#issuecomment-241214907). I'd appreciate any input on this.

Your code snippet is incorrect a little bit: .on_startup is a list-like object but not callable.
To subscribe on signal use .on_startup.append(create_redis_listener).
.on_cleanup may send cancellation to all registered long-running tasks.

Ah sure, you're right. Just edited it.

Should we close it or you do prefer to fix gunicorn worker before?

Let me fix worker, yea.

All work is done.
@f0t0n thank you again

This thread has been automatically locked since there has not been
any recent activity after it was closed. Please open a [new issue] for
related bugs.

If you feel like there's important points made in this discussion,
please include those exceprts into that [new issue].

Was this page helpful?
0 / 5 - 0 ratings