I'm trying to get Sanic (and uvloop) to use a worker pool of sub-processes for CPU intensive requests. So far I've come up with the below code. The example "work" is first run once (to ensure the process pool works as intended). Then sanic is started and the user can trigger the "work" through HTTP. See below.
import asyncio
import concurrent
import os
import time
import uvloop
from sanic import Sanic, response, config
# Define a CPU bound worker function
def deep_thought(data):
print("Working to answer: " + data)
time.sleep(5) # Actually not CPU bound in this example, but outcome should be identical
return 42
# Enable extra asyncio debugging
os.environ['PYTHONASYNCIODEBUG'] = "1"
# Make sure uvloop is used as event loop by default
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Instantiate an event loop object for main thread
loop = asyncio.get_event_loop()
# Create ProcessPoolExecutor
executor = concurrent.futures.ProcessPoolExecutor(max_workers=5)
# Define a coroutine that will call the CPU bound function in a sub process
async def ask_question():
res = await loop.run_in_executor(
executor,
deep_thought,
"What's the answer to life, the universe and everything? (no sanic)",
)
print('Answer (no sanic): ' + str(res))
# Create a task and run it in the the event loop
print('Asking question without sanic started')
task = loop.create_task(ask_question())
loop.run_until_complete(task)
# Create the Sanic app which should(?!?) use the same event loop as create above
config.Config.LOGO = None
app = Sanic()
# Endpoint to test server responsiveness
@app.route("/")
async def root(request):
return response.json({"root": True})
# CPU bound endpoint
@app.route("/ask")
async def ask(request):
res = await loop.run_in_executor(
executor,
deep_thought,
"What's the answer to life, the universe and everything? (sanic)"
)
return response.json({"result": res})
# Run the Sanic app (which will start the event loop again)
app.run(host="0.0.0.0", port=12000, debug=True)
I run the above using sanic from master branch, uvloop 0.8.1 together with python 3.6.3. The first run of the worker function in a subprocess (before starting sanic) works fine. However, when initiating the work through sanic (http://localhost:12000/ask) it bombs with an exception:
$ python3.6 ./test_sanic_subprocess.py
Asking question without sanic started
Working to answer: What's the answer to life, the universe and everything? (no sanic)
Answer (no sanic): 42
[2018-03-23 10:24:16 +0100] [19475] [INFO] Goin' Fast @ http://0.0.0.0:12000
[2018-03-23 10:24:16 +0100] [19475] [INFO] Starting worker [19475]
[2018-03-23 10:24:19 +0100] - (sanic.access)[INFO][127.0.0.1:51520]: GET http://localhost:12000/ 200 13
[2018-03-23 10:24:23 +0100] [19475] [ERROR] Traceback (most recent call last):
File "/home/kinware/src/sanic/sanic/app.py", line 597, in handle_request
response = await response
File "/home/kinware/python/lib/python3.6/asyncio/coroutines.py", line 128, in throw
return self.gen.throw(type, value, traceback)
File "./test_subprocess_nosanic.py", line 59, in ask
"What's the answer to life, the universe and everything? (sanic)"
RuntimeError: Task <Task pending coro=<Sanic.handle_request() running at /home/kinware/src/sanic/sanic/app.py:597> created at /home/kinware/src/sanic/sanic/server.py:304> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel()] created at ./test_subprocess_nosanic.py:59> attached to a different loop
Working to answer: What's the answer to life, the universe and everything? (sanic)
[2018-03-23 10:24:23 +0100] [19475] [ERROR] Exception occurred while handling uri: "http://localhost:12000/ask"
Traceback (most recent call last):
File "/home/kinware/src/sanic/sanic/app.py", line 597, in handle_request
response = await response
File "/home/kinware/python/lib/python3.6/asyncio/coroutines.py", line 128, in throw
return self.gen.throw(type, value, traceback)
File "./test_subprocess_nosanic.py", line 59, in ask
"What's the answer to life, the universe and everything? (sanic)"
RuntimeError: Task <Task pending coro=<Sanic.handle_request() running at /home/kinware/src/sanic/sanic/app.py:597> created at /home/kinware/src/sanic/sanic/server.py:304> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel()] created at ./test_subprocess_nosanic.py:59> attached to a different loop
[2018-03-23 10:24:23 +0100] - (sanic.access)[INFO][127.0.0.1:51520]: GET http://localhost:12000/ask 500 2977
It appears from the exceptions that there are multiple event loops in play here, but I'm not quite sure why/where these are created. Maybe I'm misunderstanding the python event loop concept? Also not sure what happens when the current process fork()s and the child worker processes inherits the event loop from the master process.
Any ideas what I'm doing wrong here?
If I can get it working I'd be happy to submit a PR for sanic/examples/ showing how to handle CPU intensive work in a subprocess in conjunction with the normal async processing sanic does so well.
Digging into the sanic I could see that there is a difference in how the loop is handled when launching sanic through Sanic.run() vs Sanic.create_server().
Sanic.run() uses
run_async=False # Implicit from default parameter of Sanic._helper()
loop=None # Implicit from default parameter of Sanic._helper()
which causes sanic.server.serve() to create a new loop by doing:
if not run_async:
# create new event_loop after fork
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
Sanic.create_server() uses
run_async=True # Explicitly set in create_server()
loop=get_event_loop() # Explicitly set in create_server()
This does not create a new loop, but it also leaves it to the caller to run the actual event loop (as opposed to Sanic.run() which conveniently does run the loop).
So there are two ways to get rid of the exceptions.
One is to just allow Sanic to create it's on loop and update the code to use that loop. Easily accomplished by changing loop.run_in_executor() to asyncio.get_event_loop().run_in_executor() (or possibly request.app.loop.run_in_executor()).
The alternative is to run the event loop outside of sanic and switch from Sanic.run() to Sanic.create_server() which should correctly pick up the event loop as it uses get_event_loop() internally (instad of new_event_loop() as Sanic.app() does).
This solves the problem for me. Maybe someone else finds this useful too.
Closing issue.
Most helpful comment
Digging into the sanic I could see that there is a difference in how the loop is handled when launching sanic through Sanic.run() vs Sanic.create_server().
Sanic.run() uses
which causes sanic.server.serve() to create a new loop by doing:
Sanic.create_server() uses
This does not create a new loop, but it also leaves it to the caller to run the actual event loop (as opposed to Sanic.run() which conveniently does run the loop).
So there are two ways to get rid of the exceptions.
One is to just allow Sanic to create it's on loop and update the code to use that loop. Easily accomplished by changing
loop.run_in_executor()toasyncio.get_event_loop().run_in_executor()(or possiblyrequest.app.loop.run_in_executor()).The alternative is to run the event loop outside of sanic and switch from Sanic.run() to Sanic.create_server() which should correctly pick up the event loop as it uses get_event_loop() internally (instad of new_event_loop() as
Sanic.app()does).This solves the problem for me. Maybe someone else finds this useful too.
Closing issue.