Vector: New `aws_s3` source

Created on 11 Oct 2019  ·  17Comments  ·  Source: timberio/vector

Needs Source: AWS S3

should rfc aws feature

Most helpful comment

+1

Use cases
Lots of AWS services write activity and security logs to S3 (CloudTrails, ELB access logs, VPC flow logs, AWS Config events, etc). An AWS S3 sink would allow S3 to act as a data pipeline here, with Vector as the data router receiving and routing new data.

Implementation

  • Similar to Splunk's S3 connector, I'd suggest using S3 event notifications written to an SQS queue.
  • The sink would only need to know the SQS queue ARN/endpoint.
  • As the queue messages contain the bucket name and file key, one sink config could potentially read off multiple buckets.
  • Users can control what's sent to SQS/Vector through S3 Notification filter rules.

All 17 comments

@ticon-mg, to clarify, you're referring to source archives? You can download those from the Github repo and releases in the interim.

Hi!
I am talking about the source of the log files for processing vector.

Ah, that makes more sense. Thanks!

+1

Use cases
Lots of AWS services write activity and security logs to S3 (CloudTrails, ELB access logs, VPC flow logs, AWS Config events, etc). An AWS S3 sink would allow S3 to act as a data pipeline here, with Vector as the data router receiving and routing new data.

Implementation

  • Similar to Splunk's S3 connector, I'd suggest using S3 event notifications written to an SQS queue.
  • The sink would only need to know the SQS queue ARN/endpoint.
  • As the queue messages contain the bucket name and file key, one sink config could potentially read off multiple buckets.
  • Users can control what's sent to SQS/Vector through S3 Notification filter rules.

@zcapper
But does s3 event notifications allow to process s3 data that is already stored in a bucket?
I guess polling is more flexible solution, like it is introduced in Logstash.

@NikitaGl
You're right that the event notification approach wouldn't allow pre-existing data to be backfilled. (I do wonder how Logstash tracks state so it doesn't fetch files twice, e.g. after a restart).

I'm fond of the event notifications approach because it's stateless (in Vector) and could be scaled horizontally to work with very large buckets and inventories.

FWIW Splunk seems to have both a generic and event notification based S3 connectors.

I think an interesting approach is to treat S3 like a file system. There are utilities that do exactly this, but I’m not advocating them as a solution for Vector, but more as a mental frame to think through.

The right approach and what fluentd is doing is to use s3-sqs system, which includes basically reading filename from sqs message and read corresponding data. Fluentd has similar plugin as https://github.com/fluent/fluent-plugin-s3. The issue with a generic plugin is detection of new file require to list all file. As the number of files increase, time taken to discover new file increases, so as api cost for continuous list. Sqs would be way cheaper.

What would be great to use the S3 sink and S3 source with the same features to have that as failover to deliver things reliably?
For Example, if we have distributed Vector configuration with Kafka we would have the ability to build failover through S3 if something wrong with Kafka.
If we will have some metrics from internal sinks exposed we can use conditions to detect some problems with Kafka sink and start also sending data to S3 with SQS exposed info or direct from Vector infrastructure agents.
In the end, we will have our consumer Vector infrastructure that will just consume things from Kafka and S3.
if Kafka stop working or exposing things but S3 starts we will have data (with s3 possible some delays) but still flow will work.
With bigger systems at scale, this would be a very nice addition without building any custom solutions.
We need to have this S3 source and some improvements to S3 sink also please correct me if we will have an option to do actions based on internal metrics from vector sinks when they will be fully announced as I described ??
For only S3 source there are examples from real life like Cloudflare that deliver logs to multiple object stores - https://developers.cloudflare.com/logs/logpush/ and with fluentd-plugin-s3 you can take those logs and this feature at least would be a nice addition.

I like the idea.

In our context we are not using kafka instead using firehose delivery to convert the records in ORC and deliver to s3. But even with fluentd, I found that when we hit by firehose throttling the worker keep on taking data from queue our buffer keep on growing. This is causing good data loss and we are forced to run firehose in over povisioned mode. The downside is that we are getting small chunks in s3.
It would be cool if fluentd stop taking new data from queue at all. This solves the complete buffering problem and kind of prevent data loss at the first place.

Also from our setup we are using fluentd copy to push data to firehose and grep filter, so that we push only write data to elasticsearch. Probably we can something more robust mechanism to handle these downstream to handle some destination based buffering size.

Something like

s3/sqs. --->.   vector-input --> 10MB buffer. --->   firehose
                             --> 200MB buffer --->  elasticsearch

PS Firehose ingestion is way faster than es ingestion, and it itself do buffering, we can easily do vector-input -> firehose -> elasticsearch, but why not save some money if you can :)

I am interested in attempting to build this source plugin. As the label says, it needs requirements. What would be a good minimal set of functionality/options for this source?

As input config options, I guess its a sort of union of File and Http sources:

[sources.my_source_id]
  type = "aws_s3" # required
  bucket = "s3://my-bucket" # required  - (I'm assuming we want to use the S3 protocol, and not generic http through the s3 http server) 
  ignore_older = 86400 # optional, no default, seconds
  include = ["/var/log/nginx/*.log"] # required
  start_at_beginning = false # optional, default
  polling_interval_ms = 30000
  encoding = "json"
  folder_template = "AWSLogs/<my account id>/elasticloadbalancing/us-west-2/%Y/%m/%D/*.log.gz"

Edit

Note, I, personally, am more interested in a polling model than using SQS for events.
I think with judicious use of the folder template, as well as a sane polling interval, I think we could limit the number of API calls per day. The plugin could be enhanced later with optional SQS subscriptions.

Thoughts?

Hi @rrichardson,

Thanks for your interest in contributing this! We've had a number of people ask for it.

Typically, for a new source like this where there are some decisions to be made, we like to do an RFC first to allow us to discuss and make those decisions before implementation; the goal being to reduce the amount of rework that might be needed if the discussion happened as part of the pull request. An example of this is the recently added apache_metrics source.

Would you be open to writing up a short RFC based on the template? I'm happy to focus on giving you support in the form of feedback and suggestions in the RFC and the PR. I think what you posted is a good start. I'd suggest thinking about:

  • How we would model the configuration in a way that makes it easy to add the S3 strategy of relying on SQS for bucket notifications (as this is also a strategy that people have asked for). If you feel up to it, it would be helpful to lay out the configuration model that would cover both strategies (polling and SQS) and just implement the polling one, which you want, for now so that we can avoid needing to change the configuration in backwards incompatible ways.
  • How we would keep track of which objects have already been ingested by Vector when polling. The file source checkpointing might be useful prior-art for this.

Thanks again! Let me know if I can offer any additional guidance.

@szibis Thanks for your notes too. We do have a new issue for "dead letter" behavior: #1772

@jszwedko - Thanks. I will put together an RFC. I am trying to wrap up a couple projects first, but I have this scheduled in my next sprint. :)

Some notes from another user:

One of the challenges with collection from S3 is that polling/scanning a bucket does not scale well; AWS only allows you to list the contents against a prefix (max 1000 objects at a time) and it's possible to see scanning times extending to hours, or even multiple days.

Three documented options exist for collection from S3 without needing to re-scan the bucket:

  1. SQS for pull-based notifications of new objects, and
  2. SNS for push-based notifications.
  3. Another option is to configure S3 to write details of new object creation into DynamoDB and then poll DynamoDB for details of objects created since X.

SQS is probably the best approach for typical collectors; it relies on a client to connect to the SQS and retrieve details of new events. SNS is probably the best approach for SaaS-based collection; SNS can proactively notify a publicly accessible endpoint that a new object exists. DynamoDB is probably the best approach where more specific details need to be kept about events, or a permanent record maintained of what assets are available in a large S3 bucket (archive use case?).

  • Should work with programmatic AWS access (key + secret)
  • Should be possible to set bucket name, prefix, and filename match pattern; examples:
Prefix: AWSLogs/041856772135/CloudTrail/* (all CloudTrail logs)
Filename: *CloudTrail*.json.gz
  • Should support plain, gzip, zip, and Snappy/Zippy compression (formats supported by Kinesis output to S3).
  • Should have configurable polling interval to SQS
  • Should have logging/metrics of number of objects found and processed
  • Provide support for reading/processing CloudTrail and CloudWatch logs. CloudWatch logs via Kinesis are presented as multiple non-line-separated (no newlines in file) JSON arrays (example here; note two “{"messageType"” blocks). These should be split into their individual events (i.e. the logEvents array) AND the top level metadata added to each event (owner, logGroup, logStream, subscriptionFilters). CloudTrail logs have a similar format, these have arrays of “Records” requiring similar processing to CloudWatch via Kinesis.

Hey @rrichardson,

I'm actually going to be starting on an implementation of this source using SQS for bucket notifications this week. If you were also planning on working on this soon, I'd love to collaborate on an RFC and make sure I don't step on your toes with implementation. We have a discord (https://discord.gg/sfFzZ6) that we could use for more real-time communication.

Otherwise, I can ping you when the RFC is up (hopefully tomorrow) and you can leave any feedback. I'll plan to leave a hole in the vector configuration spec for the polling strategy that you can fill in later.

RFC: https://github.com/timberio/vector/pull/4197 . I proposed configuration specifying the SQS implementation as a "strategy" to allow room for specifying a polling strategy as well. cc/ @rrichardson

Was this page helpful?
0 / 5 - 0 ratings

Related issues

binarylogic picture binarylogic  ·  4Comments

Hoverbear picture Hoverbear  ·  3Comments

valyala picture valyala  ·  3Comments

jamtur01 picture jamtur01  ·  3Comments

jhgg picture jhgg  ·  4Comments