Related to #546 and a bug in tokio where blocking annotations can not be shutdown via a drop in the task. This causes stdin sources to hang when a user attempts to shut it down.
Update the stdin source to use a similar strategy to what the file source uses by spawning a thread and using stdin directly. Then using the out channel passed into the SourceConfig::build fn to send events back into the topology. This will stop the stdin source from blocking shutdown.
To send events down the out channel you should use https://docs.rs/futures/0.1.29/futures/sync/mpsc/struct.Sender.html#method.try_send to send the event.
To fetch lines off of stdin, https://doc.rust-lang.org/std/io/struct.Stdin.html#method.read_line should be used. Once, there is a String this should be converted into an Event as seen here.
@LucioFranco I am willing to contribute to this issue if possible
@AlyHKafoury sure! Let me know if you need more info or help!
@LucioFranco a little info about where to start to tackle the stdin source module would be great to get me started
@AlyHKafoury sure!
So here is the main file https://github.com/timberio/vector/blob/master/src/sources/stdin.rs
This is where we create the stdin stream from tokio https://github.com/timberio/vector/blob/master/src/sources/stdin.rs#L43.
Ideally what we would probably want to do here https://github.com/timberio/vector/blob/master/src/sources/stdin.rs#L55 is to change this to something like so:
Box::new(future::lazy(move || {
info!("Capturing STDIN");
let host_key = config.host_key.clone().unwrap_or(event::HOST.to_string());
let hostname = hostname::get_hostname();
let (tx, rx) = futures::sync::mpsc::channel(1024);
thread::spawn(move || {
let mut stdin = std::io::stdin();
loop {
let mut buf = String::new();
if let Err(e) = stdin.read_line(&mut buf) { ... }
tx.try_send(buf).unwrap();
}
});
let source = rx
.map(move |line| create_event(line, &host_key, &hostname))
.map_err(|e| error!("error reading line: {:?}", e))
.forward(out.sink_map_err(|e| error!("Error sending in sink {}", e)))
.map(|_| info!("finished sending"));
source
}))
I believe _something_ like this should work, I didn't check any of the types but this should give you a basic framework to work off of.
Let me know if this makes sense!
@LucioFranco It makes perfect sense, thanks alot I will try to apply it in a couple of days and get back to you
Most helpful comment
@LucioFranco It makes perfect sense, thanks alot I will try to apply it in a couple of days and get back to you