Hyper: graceful shutdown does not work when I use server and client at the same time

Created on 5 Oct 2018  路  2Comments  路  Source: hyperium/hyper

I'm trying graceful shutdown.
This will work.

fn main() {
    let (tx, rx) = ::futures::sync::oneshot::channel::<()>();
    let server = ::hyper::Server::bind(&([127, 0, 0, 1], 3000).into())
        .serve(move || service_fn(handler))
        .with_graceful_shutdown(rx);
    tokio::run(
        future::ok(())
            // server
            .join(server.map_err(|err| eprintln!("server error: {:?}", err)))
            // shutdown timer
            .join(
                ::tokio_timer::sleep(::std::time::Duration::from_millis(10))
                    .map_err(|err| eprintln!("timer error: {:?}", err))
                    .and_then(move |()| {
                        println!("start shutdonwn");
                        tx.send(())
                            .into_future()
                            .map_err(|err| eprintln!("tx error: {:?}", err))
                    }),
            )
            .map(|_| ()),
    );
    println!("shutdonwn");
}

This will output

start shutdonwn
shutdonwn

However, it does not stop when I call http request at the same time.

fn main() {
    let (tx, rx) = ::futures::sync::oneshot::channel::<()>();
    let server = ::hyper::Server::bind(&([127, 0, 0, 1], 3000).into())
        .serve(move || service_fn(handler))
        .with_graceful_shutdown(rx);
    tokio::run(
        future::ok(())
            // server
            .join(server.map_err(|err| eprintln!("server error: {:?}", err)))
            // client
            .join(
                ::hyper::Client::new()
                    .get("http://localhost:3000".parse().unwrap())
                    .map_err(|err| eprintln!("client error: {:?}", err))
                    .and_then(|res| {
                        println!("status: {}", res.status());
                        res.into_body()
                            .concat2()
                            .map_err(|err| eprintln!("concat error: {:?}", err))
                    })
                    .and_then(|body| {
                        ::std::str::from_utf8(&body)
                            .map(|a| a.to_string())
                            .into_future()
                            .map_err(|err| eprintln!("utf8 error: {:?}", err))
                    })
                    .and_then(|body| {
                        println!("body: {}", body);
                        Ok(()).into_future()
                    }),
            )
            // shutdown timer
            .join(
                ::tokio_timer::sleep(::std::time::Duration::from_millis(10))
                    .map_err(|err| eprintln!("timer error: {:?}", err))
                    .and_then(move |()| {
                        println!("start shutdonwn");
                        tx.send(())
                            .into_future()
                            .map_err(|err| eprintln!("tx error: {:?}", err))
                    }),
            )
            .map(|_| ()),
    );
    println!("shutdonwn"); // not printed
}

What I expected

status: 200 OK
body: hello
start shutdonwn
shutdonwn

actual

status: 200 OK
body: hello
start shutdonwn


here is the full sample code

extern crate env_logger;
extern crate futures;     // 0.1.24
extern crate http;        // 0.1.13
extern crate hyper;       // 0.12.10
extern crate tokio;       // 0.1.8
extern crate tokio_timer; // 0.2.6 // 0.5.13

use futures::future;
use futures::prelude::*;
use hyper::service::service_fn;
use hyper::{Body, Method, Request, Response, StatusCode};

fn handler(
    req: Request<Body>,
) -> impl Future<Item = Response<Body>, Error = ::hyper::Error> + Send + 'static {
    let mut res = Response::new(Body::empty());
    match (req.method(), req.uri().path()) {
        (&Method::GET, "/") => {
            let _ = *res.body_mut() = Body::from("hello");
            future::ok(res)
        }
        _ => {
            *res.status_mut() = StatusCode::NOT_FOUND;
            future::ok(res)
        }
    }
}

fn main() {
    let _ = env_logger::try_init();
    let (tx, rx) = ::futures::sync::oneshot::channel::<()>();
    let server = ::hyper::Server::bind(&([127, 0, 0, 1], 3000).into())
        .serve(move || service_fn(handler))
        .with_graceful_shutdown(rx);
    tokio::run(
        future::ok(())
            // server
            .join(server.map_err(|err| eprintln!("server error: {:?}", err)))
            // client
            .join(
                ::hyper::Client::new()
                    .get("http://localhost:3000".parse().unwrap())
                    .map_err(|err| eprintln!("client error: {:?}", err))
                    .and_then(|res| {
                        println!("status: {}", res.status());
                        res.into_body()
                            .concat2()
                            .map_err(|err| eprintln!("concat error: {:?}", err))
                    })
                    .and_then(|body| {
                        ::std::str::from_utf8(&body)
                            .map(|a| a.to_string())
                            .into_future()
                            .map_err(|err| eprintln!("utf8 error: {:?}", err))
                    })
                    .and_then(|body| {
                        println!("body: {}", body);
                        Ok(()).into_future()
                    }),
            )
            // shutdown timer
            .join(
                ::tokio_timer::sleep(::std::time::Duration::from_millis(10))
                    .map_err(|err| eprintln!("timer error: {:?}", err))
                    .and_then(move |()| {
                        println!("start shutdonwn");
                        tx.send(())
                            .into_future()
                            .map_err(|err| eprintln!("tx error: {:?}", err))
                    }),
            )
            .map(|_| ()),
    );
    println!("shutdonwn");
}

Most helpful comment

I just tried this out, and while your example code does hang like you said, the issue isn't with the graceful shutdown in the server. It is indeed closing the connection to the client. The problem is that the client has spawned an interval to clear out idle connections in its pool, and tokio::run it blocking until that task is finished. Since tokio::run is blocked, the destructor for hyper::Client can't run (until the function scope exits).

To verify that was the issue, I wrapped the full future chain inside a future::lazy(|| { .. }), so that the Client is dropped at the end of the lazy closure...

All 2 comments

I just tried this out, and while your example code does hang like you said, the issue isn't with the graceful shutdown in the server. It is indeed closing the connection to the client. The problem is that the client has spawned an interval to clear out idle connections in its pool, and tokio::run it blocking until that task is finished. Since tokio::run is blocked, the destructor for hyper::Client can't run (until the function scope exits).

To verify that was the issue, I wrapped the full future chain inside a future::lazy(|| { .. }), so that the Client is dropped at the end of the lazy closure...

it solved. thank you.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

hwchen picture hwchen  路  3Comments

gabisurita picture gabisurita  路  4Comments

da2018 picture da2018  路  3Comments

FGRibreau picture FGRibreau  路  4Comments

reem picture reem  路  3Comments