[email protected] and [email protected] exhibit this issue, [email protected] does not.
tokio-hang-reproduction v0.1.0 (/Users/bryan/personal/tokio-hang-reproduction)
│ │ │ ├── tokio v0.2.14
│ │ │ │ └── tokio-macros v0.2.5
│ │ │ └── tokio-util v0.3.1
│ │ │ └── tokio v0.2.14 (*)
│ │ ├── tokio v0.2.14 (*)
│ ├── tokio v0.2.14 (*)
└── tokio v0.2.14 (*)
Currently, I can get this to happen on AWS Lambda.
I can not reproduce on MacOS (Darwin Bryans-MacBook-Pro.local 19.3.0 Darwin Kernel Version 19.3.0: Thu Jan 9 20:58:23 PST 2020; root:xnu-6153.81.5~1/RELEASE_X86_64 x86_64) or in Docker-for-Mac.
For our GraphQL server, we use AWS Lambda + Juniper + Tokio. Juniper is currently a synchronous library, but the Lambda library we use and our model layer are fully [email protected].
In order to get Juniper to call our models, we use futures::executor::block_on, based largely on this comment: https://github.com/tokio-rs/tokio/issues/2042#issuecomment-569919258
Starting with [email protected], this approach has started hanging and causing timeouts. Downgrading to [email protected] solved the issue.
https://github.com/bryanburgers/tokio-hang-reproduction
#![allow(dead_code)]
use futures::stream::{self, StreamExt};
use tokio::time::delay_for;
use std::time::Duration;
use serde_json::{Value, json};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let fun = lambda::handler_fn(handler);
lambda::run(fun).await?;
Ok(())
}
async fn handler(_event: Value) -> Result<Value, String> {
let result = invoke_juniper();
// If we don't use juniper and .await the futures normally, it works fine.
// let result = test1(100).await;
println!("result={:?}", result);
Ok(json!({}))
}
// Juniper (https://crates.io/crates/juniper) is a synchronous library.
fn invoke_juniper() -> Vec<usize> {
// And from it, we call into our async code using block_on
futures::executor::block_on(test1(100))
}
// All three of these functions fail after executing ~60 futures.
async fn test1(n: usize) -> Vec<usize> {
stream::iter(0..n).map(pause_and_return).buffer_unordered(10).collect().await
}
async fn test2(n: usize) -> Vec<usize> {
stream::iter(0..n).map(pause_and_return).buffered(10).collect().await
}
async fn test3(n: usize) -> Vec<usize> {
let mut vec = Vec::new();
for i in 0..n {
vec.push(pause_and_return(i).await);
}
vec
}
async fn pause_and_return(n: usize) -> usize {
println!("Starting {}", n);
delay_for(Duration::from_millis(100)).await;
println!("Finished {}", n);
n
}
A coworker was just able to reproduce this on Linux without requiring Lambda.
Linux some.hostname 4.15.0-1060-aws #62-Ubuntu SMP Tue Feb 11 21:23:22 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
To run it without lambda:
#![allow(dead_code)]
use futures::stream::{self, StreamExt};
use tokio::time::delay_for;
use std::time::Duration;
use serde_json::{Value, json};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
handler(Value::Null).await?;
Ok(())
}
async fn handler(_event: Value) -> Result<Value, String> {
let result = invoke_juniper();
// If we don't use juniper and .await the futures normally, it works fine.
// let result = test1(100).await;
println!("result={:?}", result);
Ok(json!({}))
}
// Juniper (https://crates.io/crates/juniper) is a synchronous library.
fn invoke_juniper() -> Vec<usize> {
// And from it, we call into our async code using block_on
futures::executor::block_on(test1(100))
}
// All three of these functions fail after executing ~60 futures.
async fn test1(n: usize) -> Vec<usize> {
stream::iter(0..n).map(pause_and_return).buffer_unordered(10).collect().await
}
async fn test2(n: usize) -> Vec<usize> {
stream::iter(0..n).map(pause_and_return).buffered(10).collect().await
}
async fn test3(n: usize) -> Vec<usize> {
let mut vec = Vec::new();
for i in 0..n {
vec.push(pause_and_return(i).await);
}
vec
}
async fn pause_and_return(n: usize) -> usize {
println!("Starting {}", n);
delay_for(Duration::from_millis(100)).await;
println!("Finished {}", n);
n
}
What happens if you put the blocking call (invoke_juniper) in a tokio::task::spawn_blocking or tokio::task::block_in_place call?
Quickly testing with spawn_blocking does seem to work on [email protected] and [email protected], on Lambda.
async fn handler(_event: Value) -> Result<Value, String> {
let result = tokio::task::spawn_blocking(invoke_juniper).await.map_err(|e| e.to_string())?;
println!("result={:?}", result);
Ok(json!({}))
}
Great! Thank you, @hawkw!
Can you help me understand why this helps? And if this is a long-term fix or a short-term fix?
I know you had mentioned previously that using futures::executor::block_on isn't ideal, but at the time there didn't seem to be a better way. I'm curious if there's a better way now?
So, the code inside the futures::executor::block_on is not _really_ running on the tokio executor. If a future inside of the block_on yields, it only yields to the executor created by the call to block_on, _not_ back to the Tokio runtime created by tokio::main.
I think that the reason this code is breaking in 0.2.14 is because 0.2.14 introduced automatic cooperative task yielding to improve the scheduler's fairness. After your stream iterates so many times, in 0.2.14+ the task's budget is exhausted, and it yields to the executor to give other tasks a turn. When that happens, it _should_ notify itself so that it goes to the end of the queue to continue executing after other tasks have been allowed to run. My guess is that for some reason related to an interaction between futures::executor::block_on and running on a Tokio worker, your task is not being polled again after it yields.
Using spawn_blocking fixes this, because the blocking code is now running on a separate blocking threadpool, rather than on a Tokio worker thread. However, it's not ideal, because when you use block_on to start executing async code again, it's no longer running on the Tokio runtime. I think what you want to be doing is running the blocking call with spawn_blocking, and when you need to call back into your async code, you spawn it on the main Tokio runtime, and only use futures::executor::block_on (from the blocking thread) to await the spawned future's JoinHandle. I mean, ideally, you wouldn't be doing _that_, either --- the blocking code would just return, and then the async work would continue, rather than having the blocking code call back into asynchronous code, but if the third-party library doesn't work that way, I guess this would be the closest you can get.
Calling futures::executor::block_on(...) should never be done from within the context of a tokio task. Unfortunately, we do not control that function as it is from futures. If we did, we would make it panic.
The solution is indeed to wrap blocking calls w/ spawn_blocking.
I understand that I shouldn’t be using futures::executor::block_on.
I understand that spawn_blocking can get me from an async context to a sync context so I can call the third party code (in this case, Juniper).
What I don’t understand is when Juniper calls code I provided to it (assuming a synchronous context), how can I get back into an async context to get data that I can return to it Juniper?
That’s why futures::executor::block_on is appealing (even though it’s wrong). It has the right signature.
I need something that is roughly fn _(fut: impl Future<Output=T>) -> T.
I can’t use runtime.block_on because In this context it’s not possible to have a mutable reference to the runtime. Handle doesn’t have a block_on. If the JoinHandle returned from a handle.spawn can be waited for from a synchronous context, I don’t see it.
I really wish I didn’t have to do the async -> sync -> async dance, but at this point I don’t have a choice. I’m really looking for _any_ reliable way to make that second jump.
Use spawn_blocking when you go from async -> sync. If you do that, you 100% can use futures::executors::block_on to go from sync -> async.
I can’t use
runtime.block_onbecause In this context it’s not possible to have a mutable reference to the runtime.Handledoesn’t have ablock_on. If theJoinHandlereturned from ahandle.spawncan be waited for from a synchronous context, I don’t see it.
What I was trying to suggest is just that (IMO) you would probably prefer your async code in the callback to be running on the tokio executor alongside your other async code, rather than on the separate executor created by block_on. So, if you use spawn_blocking to go from async to sync, you would want to use futures::executor::block_on with a JoinHandle rather than passing your future into block_on directly.
We're experiencing the same problem in Deno - after flushing a few string to stdout executor hangs. Similar to @bryanburgers we use future::executor::block_on() to block current thread on the write future.
Contrary to @carllerche advice we issue a call block_on() from context of Tokio task:
(problematic bit is at the end of function)
spawn_blocking and block_in_place are not really viable solutions in this case because we use single threaded Tokio runtime and futures are not marked as Send. Is there a way to manually reset task budget or ignore budget altogether?
EDIT: Upgraded to Tokio 0.2.16 and everything works as expected 😄
@bartlomieju Using block_on in a single-threaded executor sounds like a recipe for deadlock disaster. I very strongly recommend finding another solution.
Most helpful comment
Use
spawn_blockingwhen you go from async -> sync. If you do that, you 100% can usefutures::executors::block_onto go from sync -> async.