Vector: Update `stdin` source to use a thread instead of using the tokio version

Created on 25 Sep 2019  路  5Comments  路  Source: timberio/vector

Related to #546 and a bug in tokio where blocking annotations can not be shutdown via a drop in the task. This causes stdin sources to hang when a user attempts to shut it down.

Specification

Update the stdin source to use a similar strategy to what the file source uses by spawning a thread and using stdin directly. Then using the out channel passed into the SourceConfig::build fn to send events back into the topology. This will stop the stdin source from blocking shutdown.

To send events down the out channel you should use https://docs.rs/futures/0.1.29/futures/sync/mpsc/struct.Sender.html#method.try_send to send the event.

To fetch lines off of stdin, https://doc.rust-lang.org/std/io/struct.Stdin.html#method.read_line should be used. Once, there is a String this should be converted into an Event as seen here.

good first issue stdin bug

Most helpful comment

@LucioFranco It makes perfect sense, thanks alot I will try to apply it in a couple of days and get back to you

All 5 comments

@LucioFranco I am willing to contribute to this issue if possible

@AlyHKafoury sure! Let me know if you need more info or help!

@LucioFranco a little info about where to start to tackle the stdin source module would be great to get me started

@AlyHKafoury sure!

So here is the main file https://github.com/timberio/vector/blob/master/src/sources/stdin.rs

This is where we create the stdin stream from tokio https://github.com/timberio/vector/blob/master/src/sources/stdin.rs#L43.

Ideally what we would probably want to do here https://github.com/timberio/vector/blob/master/src/sources/stdin.rs#L55 is to change this to something like so:

    Box::new(future::lazy(move || {
        info!("Capturing STDIN");

        let host_key = config.host_key.clone().unwrap_or(event::HOST.to_string());
        let hostname = hostname::get_hostname();

        let (tx, rx) = futures::sync::mpsc::channel(1024);

        thread::spawn(move || {
            let mut stdin = std::io::stdin();

            loop {
                let mut buf = String::new();
                if let Err(e) = stdin.read_line(&mut buf) { ... }
                tx.try_send(buf).unwrap();
            }
        });

        let source = rx
        .map(move |line| create_event(line, &host_key, &hostname))
        .map_err(|e| error!("error reading line: {:?}", e))
        .forward(out.sink_map_err(|e| error!("Error sending in sink {}", e)))
        .map(|_| info!("finished sending"));

        source
    }))

I believe _something_ like this should work, I didn't check any of the types but this should give you a basic framework to work off of.

Let me know if this makes sense!

@LucioFranco It makes perfect sense, thanks alot I will try to apply it in a couple of days and get back to you

Was this page helpful?
0 / 5 - 0 ratings

Related issues

binarylogic picture binarylogic  路  4Comments

jhgg picture jhgg  路  4Comments

a-rodin picture a-rodin  路  3Comments

LucioFranco picture LucioFranco  路  3Comments

LucioFranco picture LucioFranco  路  3Comments