I'm not sure if this is a hyper, tokio_timer or general futures issue, it seems similar to #1353 - but that suggests this is fixed by using no_proto on the Client, and I see the same problem whether or not I use no_proto .
Originally raised as https://github.com/jimmycuadra/rust-etcd/issues/20, but managed to isolate it to hyper.
The trigger seems to be using tokio_timer::Timeout on a Client.get() that has returned the headers with Chunked-Encoding but no chunks received yet.
Nothing obvious in trace-level logs.
Repro instructions (on Linux - not tried on any other platforms), mostly cribbed from the rust-etcd repo:
docker run --net=host -d quay.io/coreos/etcd:v2.2.0 works) sudo netstat - plant | grep 2379 (and eventually you'll run out of ports). extern crate hyper;
extern crate tokio_core;
#[macro_use]
extern crate log;
extern crate env_logger;
extern crate futures;
extern crate tokio_timer;
use std::time::Duration;
use futures::future::Future;
use futures::Stream;
use tokio_core::reactor;
use tokio_timer::{Timer, TimeoutError};
use hyper::{Client, Uri};
use std::error::Error;
// Dummy error type to handle this
#[derive(Debug)]
enum MyError {
Timeout,
Other(String),
}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
write!(f, "{}", self.description())
}
}
impl Error for MyError {
fn description(&self) -> &str {
match *self {
MyError::Timeout => "Timeout",
MyError::Other(ref s) => s,
}
}
}
impl<T> From<TimeoutError<T>> for MyError {
fn from(_error: TimeoutError<T>) -> MyError {
MyError::Timeout
}
}
impl From<hyper::Error> for MyError {
fn from(error: hyper::Error) -> MyError {
MyError::Other(error.description().to_string())
}
}
fn main() {
env_logger::init().unwrap();
let mut core = reactor::Core::new().unwrap();
let client = Client::new(&core.handle());
let mut idx = 0;
loop {
let url_str = format!(
"http://localhost:2379/v2/keys/foo?waitIndex={}&wait=true&recursive=true",
idx
);
let url = url_str.parse::<Uri>().unwrap();
debug!("Kicking off watch for index: {}", idx);
let timer = Timer::default();
let cur_idx = idx;
let work: Box<Future<Item = (), Error = MyError>> = Box::new(
timer.timeout(
client
.get(url)
.map_err(|e| MyError::Other(e.description().to_string()))
.and_then(|response| {
let status = response.status();
response
.body()
.concat2()
.map_err(|e| MyError::Other(format!("Body parse error: {}", e)))
.and_then(move |ref _body| if status == hyper::StatusCode::Ok {
idx += 1;
Ok(())
} else {
Err(MyError::Other("Bad response from etcd".to_string()))
})
})
.map(|_| {
debug!("Completed watch for index {}", cur_idx);
}),
Duration::from_secs(3),
),
);
let res = core.run(work);
match res {
Ok(_) => debug!("Spotted change from etcd"),
Err(MyError::Timeout) => info!("Timeout"),
Err(MyError::Other(e)) => error!(" error from etcd: {}", e),
}
}
}
no_proto isn't the default.
Ah - apologies, so it isn't (double negatives confused me!). Just tried enabling no_proto() and makes no difference.
Does the response that comes back include a response body?
PS, don't call Timer::default() in a loop. That is creating a worker thread every single time. Create it once outside the loop.
@seanmonstar No, no body at all - it's a polling GET that returns when a change is detected (see under "Waiting for a change" at https://coreos.com/etcd/docs/latest/v2/api.html), and it's expected in our real use case that most of the time it would time out with no body.
Thanks for the performance tip on Timer::default(), though that isn't the cause.
I've tried doing some code reading and debugging but I'm finding the hyper code quite tricky to get my head around!
This has been fixed when the no_proto config is set on the Client. The new dispatcher will soon be the default, fixing this for everyone.
Sorry to dispute - but pulling from the latest master (8f6931b3), and switching to use no_proto(), I still see the same problem with the above script. So there must be another bug..
Would you prefer this reopened or a new issue?
In fact, I can repro on master by patching your new test as follows. The key is to return a chunked-encoding with no content-length and no body. With this code, the drop_client_closes_idle_connections test hangs forever (because the connection never drops).
--- a/tests/client.rs
+++ b/tests/client.rs
@@ -673,8 +673,9 @@ mod dispatch_impl {
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
let mut buf = [0; 4096];
sock.read(&mut buf).expect("read 1");
- let body =[b'x'; 64];
- write!(sock, "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", body.len()).expect("write head");
+ // let body =[b'x'; 64];
+ let body = vec![];
+ write!(sock, "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n").expect("write head");
let _ = sock.write_all(&body);
let _ = tx1.send(());
Ah yes, this is different! Not idle connections, but rather an active connection when the Response or the Body related to it has been dropped.
Ok, new patch fixes the test case you provided.
Yes, that works! Thanks so much for the speedy fixes.
Most helpful comment
Ok, new patch fixes the test case you provided.