Vector: Add joinable transforms

Created on 22 Oct 2019  路  8Comments  路  Source: timberio/vector

Motivation

As we further improve vector we would like to be able to "enrich" our data events with external metadata that may not be available at collection time in the source. Examples of this are enriching events based on the ec2 instance they come from, the kube pod they derive from or which ECS task they are apart of.

Proposal

To accomplish this, we should provide the ability for our transforms to apply what we call a "stream table join" where we are able to join some external dataset onto our events. At a basic level Vector's transform trait already supports this as can be seen with the [add_fields] transform.

Currently, our transform trait looks like this:

pub trait Transform: Send {
    fn transform(&mut self, event: Event) -> Option<Event>;

    fn transform_into(&mut self, output: &mut Vec<Event>, event: Event) {...}
}

This trait is then consumed via a futures::stream::Map stream combinator in the [topology transform builder]. This works well for transforms that already contain all their data at build time but this does not support dynamically fetching metadata over time. Examples of this are watching for pod metadata updates via the kube api or fetching new labels from ec2 metadata. These are items that may change over the course of running vector without the transform getting rebuilt.

To solve this we must provide the ability to run a background task that will be able to poll/collect new metadata and update some state (think hashmap or vec). This state can be updated via a simple RwLock that can be passed into the transform. The transform then can just acquire a short read lock to check the state and apply the metadata. The missing piece that we have here is the lack of an executor passed into TransformConfig::build which means we must lazily spawn the background task. While this works this is a poor pattern and it would be nicer to support this use case directly via the TransformConfig trait.

The change I suggest we make is to pass the tokio executor into TransformConfig::build to allow us to spawn a background task that can update some RwLock over time based on some external IO.

#[typetag::serde(tag = "type")]
pub trait TransformConfig: core::fmt::Debug {
    fn build(&self, exec: tokio::executor::Executor) -> crate::Result<Box<dyn transforms::Transform>>;

    fn input_type(&self) -> DataType;

    fn output_type(&self) -> DataType;
}

This would then allow us to implement a transform like so:

pub struct MyTransformConfig {}

pub struct MyTransform {
    state: Arc<RwLock<HashMap<Atom, Bytes>>>,
}

impl TransformConfig for MyTransformConfig {
    fn build(&self) -> crate::Result<Box<dyn Transform>> {
        let state = Arc::new(RwLock::new(HashMap::new()));

        // The `Background` struct is some `Future` that can
        // loop and poll some external resource, either via a timer
        // ticker or via some "watch" type api. It will then
        // acquire a write lock on the state and update it.
        //
        // Note: we must ensure that the write lock is only acquire for 
        // a very short period to avoid other tasks locking up the runtime
        // or blocking the transform from operating.
        let bg = Background::new(state.clone());

        exec.spawn(bg);

        Ok(Box::new(MyTransform { state }))
    }

    // == snip ==
}

impl Transform for MyTransform {
    fn transform(&mut self, mut event: Event) -> Option<Event> {
        let state = self.state.read().expect("lock poisoned");

        update_event(&mut event, state);

        Some(event)
    }
}

This implementation gives us extreme freedom in how we might want to implement dynamic transforms in the future. This design was derived from how the tokio_sync::watch is implemented and how fluentd implements its stream table join transforms.

Even though this specification is long and the solution is very simple this also provides a correct framework for implementing this type of transform with a tokio/rust based futures executor.

cc @lukesteensen

topology enhancement

Most helpful comment

@binarylogic regarding

Also, this seems like a requirement for #1249, so I'm curious how we accomplished that.

It's almost the opposite, #1249 parses additional data from log file paths to provide key on which table join can be performed, in the case of Kubernetes. This issue and #1249 are necessary to implement #1072.

All 8 comments

Passing an executor to build seems fine with me. The more interesting question is how to actually do the join.

It basically boils down to what you do when you have no entry in the table for the lookup key from the incoming event. The simple thing to do is just accept the failed join and pass the event through. Alternatively, you could take this as a signal that your table could be outdated and try to fetch new data. Since fetching is async, this would require either a rework of the Transform trait or a separate trait for these types of joins.

I usually advocate for starting with the simple thing, but we should be aware that this makes the transform more likely to fail (i.e. early logs from a new pod may not get metadata added). If that's fine, we can certainly go with it.

Eventually, though, I do think we'll want a more dependable join mechanism. I'm just not sure when it will make the most sense to make that investment.

I'd also be interested to see how other tools handle this. Do they just accept the race condition between processing the first logs from a new pod and polling for its metadata? Or it's possible that this isn't something that comes up in practice.

It basically boils down to what you do when you have no entry in the table for the lookup key from the incoming event.

In my mind, it should just pass through the event. This seems the most logical.

I usually advocate for starting with the simple thing, but we should be aware that this makes the transform more likely to fail (i.e. early logs from a new pod may not get metadata added). If that's fine, we can certainly go with it.

So this leads me to think we should maybe update the build fns to return a future as well. So we can have a true, "build" phase that can allow us to async fetch data. For example, we could fetch the total list of pods before the topology is ready. This would mean that the only way to not have an enriched event is due to the pod not existing, bar eventual consistency/race conditions etc.

To further back this, https://github.com/timberio/vector/pull/1078/files#diff-8b3f1c2130e7e20a393bcdc15961f87bR45 needs to do some async io as well on build. So this pattern could be more useful throughout the entire codebase.

I'd also be interested to see how other tools handle this. Do they just accept the race condition between processing the first logs from a new pod and polling for its metadata? Or it's possible that this isn't something that comes up in practice.

From my basic knowledge right now it looks like most attempt to completely fetch an initial state before letting events come through.

So this leads me to think we should maybe update the build fns to return a future as well. So we can have a true, "build" phase that can allow us to async fetch data.

Sorry, I wasn't talking about build time. I'm assuming that pods will come and go while vector is up and running, so there is a race condition between learning about a new pod's metadata and receiving an event from its first logs.

Sorry, I wasn't talking about build time. I'm assuming that pods will come and go while vector is up and running, so there is a race condition between learning about a new pod's metadata and receiving an event from its first logs.

Yeah, so I think the best we can do here is accept that some pods might miss the metadata, which I think is fine. In reality, I don't think this will be an issue. For the case of kube, we should be notified of a new pod before we get logs by using the watch api that is provided. This will allow us to watch for pods that might be starting up and thus we can add these to the join state. In 99% of cases, I believe we will have the pod metadata before any events come through.

If it will notify us when things change that definitely helps. This should also be pretty easy to test.

Yup, so if our background task is just a loop, it can either be triggered by a timer or an inbound event. Then on the transform side, all it needs to do is acquire a read lock no need to wake, etc. Yup, testing should be very easy here.

@LucioFranco I'm assuming based on this conversation that we're in agreement to do this? If so, feel free to remove the needs: approval label and slot it into your work.

Also, this seems like a requirement for #1249, so I'm curious how we accomplished that.

@binarylogic regarding

Also, this seems like a requirement for #1249, so I'm curious how we accomplished that.

It's almost the opposite, #1249 parses additional data from log file paths to provide key on which table join can be performed, in the case of Kubernetes. This issue and #1249 are necessary to implement #1072.

Was this page helpful?
0 / 5 - 0 ratings