Sanic: how to use apscheduler with multiple workers

Created on 8 Nov 2017  ·  22Comments  ·  Source: sanic-org/sanic

Issue https://github.com/channelcat/sanic/issues/743 covers the case of using apscheduler quite well if you are using a single worker. Issue is when you use multiple workers the scheduled tasks are run once per worker. If you only want something to run one every 4 hours this is undesirable.

How ought one to use multiple workers for likes like aiohttp and the like while also using apscheduler?

Most helpful comment

It seems that there is still no progress. Hum!

Sanic does not expose attributes related to worker ids, so we will not be able to bind the scheduler to a specific worker process.

So, i think we should find a way to do some locking action out of the app instance but shared with different processes. This method is using python built-in module: fcntl https://docs.python.org/3.6/library/fcntl.html?highlight=fcntl#fcntl.lockf

Module fcntl support a method to lock file cross-processes, so we can acquire an exclusive lock on a file before start the apscheduler.

You can reference the code below based on #743

import fcntl
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sanic import Sanic
from sanic.response import json

async def tick():
    print('Tick! The time is: %s' % datetime.now())

app = Sanic()

@app.listener('before_server_start')
async def initialize_scheduler(app, loop):
    try:

        # +++++++
        _ = open("/tmp/sanic.lock","w")
        _fd = _.fileno()
        fcntl.lockf(_fd,fcntl.LOCK_EX|fcntl.LOCK_NB)
        # +++++++

        scheduler = AsyncIOScheduler({'event_loop': loop})
        scheduler.add_job(tick, 'interval', seconds=1)
        scheduler.start()
    except BlockingIOError:
        pass

@app.route("/")
async def test(request):
    return json({"hello": "world"})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, debug=True)

All 22 comments

You need to run APScheduler in a dedicated worker process and connect to it via some form of RPC (Remote Procedure Calls). There's an example here. Some adaptation is required for use in an async environment.

Another solution would be to use a shared job store and subclass the scheduler to use some form of distributed locking, so only one scheduler instance can retrieve the due jobs from the job store. I have no concrete examples of this however.

The problem with shared job stores is how to notify the scheduler that it needs to wake up sooner because a job has been added with an earlier next run time than the earliest job before that. I have no good solution for that.

I'm a little confused as to why this is a tough solve, so maybe I'm missing something.

Couldn't you simply have the state of a job change to "claimed" when a worker takes it, then have all workers check the jobstore at regular intervals for unclaimed jobs? First come, first serve. An option could prevent a single worker from taking multiple jobs at once, to help distribute the work load.

I'm in a scenario where I'll have 16 instances of the same application running across two hosts for high-availability purposes. That's 16 workers to distribute work between. The tougher problem from my perspective is preventing schedulers from setting the same job 16 times into the queue.

I'm a little confused as to why this is a tough solve, so maybe I'm missing something.

APScheduler does not work like Celery does. It was not designed for high availability. It was designed for easily running scheduled tasks within a single application process. Perhaps this is the source of your confusion?

Couldn't you simply have the state of a job change to "claimed" when a worker takes it, then have all workers check the jobstore at regular intervals for unclaimed jobs? First come, first serve. An option could prevent a single worker from taking multiple jobs at once, to help distribute the work load.

You're describing Celery here. With APScheduler, the scheduler itself distributes the work load. There is also the problem of detecting a dead scheduler, to prevent jobs from staying in the "claimed" state for all eternity. The problem is hard because solving it requires an almost complete redesign of the APScheduler library.

It seems that there is still no progress. Hum!

Sanic does not expose attributes related to worker ids, so we will not be able to bind the scheduler to a specific worker process.

So, i think we should find a way to do some locking action out of the app instance but shared with different processes. This method is using python built-in module: fcntl https://docs.python.org/3.6/library/fcntl.html?highlight=fcntl#fcntl.lockf

Module fcntl support a method to lock file cross-processes, so we can acquire an exclusive lock on a file before start the apscheduler.

You can reference the code below based on #743

import fcntl
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sanic import Sanic
from sanic.response import json

async def tick():
    print('Tick! The time is: %s' % datetime.now())

app = Sanic()

@app.listener('before_server_start')
async def initialize_scheduler(app, loop):
    try:

        # +++++++
        _ = open("/tmp/sanic.lock","w")
        _fd = _.fileno()
        fcntl.lockf(_fd,fcntl.LOCK_EX|fcntl.LOCK_NB)
        # +++++++

        scheduler = AsyncIOScheduler({'event_loop': loop})
        scheduler.add_job(tick, 'interval', seconds=1)
        scheduler.start()
    except BlockingIOError:
        pass

@app.route("/")
async def test(request):
    return json({"hello": "world"})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, debug=True)

@liuqian1989 Assuming that you are 4 workers and using your example above, if say the scheduler is initialized with the first worker, how would the 4th worker access the scheduler?

It wouldn't be, hence the suggestion to launch it as a separate process.

@agronholm As I have suggested on another thread, I have tried your example of using the rpc service but so far I have been unable to use it with sanic. Your sync example works but when I put it in the async environment, I got a bunch of errors that I cannot resolve.

Further, if I simply create the scheduler as

scheduler = AsyncIOScheduler()

using your exact example without changing anything else, nothing gets processed and I am unsure how I might achieve what I need to achieve.

I would love to see a working example of this RPC working with Sanic if it is possible.

What exact problems were you having when using the RPC with Sanic?

  1. If I use switch to AsyncIOScheduler() with your code example, nothing get called.

Why would you switch the scheduler to AsyncIOScheduler in the server code? The example isn't even running the asyncio event loop!

I put the asyncio event loop into the server. I need to run async methods. Do I not use AsyncIOScheduler if I need to run async methods?

You were supposed to run the scheduler in a separate process, not inside your web application! Your web application acts as an RPC client for the scheduler process. This also means that you need to use run_in_executor() to call any remote procedures, at least if RPyC is used. Another option would be to make an HTTP API around the scheduler, like how flask-apscheduler did it.

server = the separate process. I use the term server in reference to the example that you have created. I _am_ running the web application as my client. In any case, I will look into these options… if only someone would make a sanic-apscheduler which deals with everything for us!

I am a bit confused now. So you're saying that in the process that exclusively runs the scheduler, you need to run coroutine functions as scheduled tasks on top of an asyncio event loop? Or what other reason do you have for running asyncio on that scheduler process?

Correct—the functions / tasks that I need to run are async. Theoretically speaking, I could possibly write sync versions for all of them but there are many of them and I would prefer to use my existing code.

In that case your best option is to wrap the scheduler functionality with some ReST API which is then accessed by your web workers.

Noted. I will try that. Thanks for your help!

@agronholm I have implemented the RESTful API as you have suggested (actually did it in Sanic also!) and it is now working smoothly — thank you again for the suggestion!

@liuqian1989 блокировка на файле это очень плохо, запустить 17 воркеров и ждать, пока один выполнит свою работу это "so bad"

@agronholm I have implemented the RESTful API as you have suggested (actually did it in Sanic also!) and it is now working smoothly — thank you again for the suggestion!

hello, can you let me see your codes about you said? thanks!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

geekpy picture geekpy  ·  4Comments

graingert picture graingert  ·  3Comments

jasonab picture jasonab  ·  3Comments

rainyear picture rainyear  ·  3Comments

ZeeRoc picture ZeeRoc  ·  3Comments