Fluent-bit: Add Pipeline to Elasticsearch Output Plugin

Created on 4 Jul 2017  路  13Comments  路  Source: fluent/fluent-bit

Hello,

I wonder if you might consider the option to add Pipeline option to the ES Output Plugin.

This would enable more complex processing of data using the Pipeline functionality.

Some more info:
https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html

Here is an article that mentions how Logstash can be configured to use it:
https://www.elastic.co/blog/new-way-to-ingest-part-1

And here are a couple of options how the Pipeline can be specified in the request:
https://stackoverflow.com/questions/41909936/elastic-search-bulk-api-pipeline-and-geo-ip

I know that some of the use cases of Pipelines will be covered with the upcoming release of new plugins, e.g. the upcoming filter_parser that would enable parsing string into JSON. But it looks like the ES Pipelines is currently somewhat more flexible and powerful.

I guess implementing this functionality could be done relatively fast, enable users to get more benefits from fluent-bit faster and allowing the development team to concentrate on some other features.

Thanks,
Vlad

Implemented enhancement

Most helpful comment

awesome, 0.12 will be available shortly.

thanks again.

All 13 comments

@spatialvlad thanks for the idea.

One concern that I see with this optional approach, is that Fluent Bit will need to issue one HTTP request per document/record. At the moment we use Bulk API where we ingest multiple documents in one request.

Do you know if this Pipeline stuff allows bulk import ?

@edsiper ,

Switching to one request per document would be not very good.

Luckily, it looks like the Pipelines work with the bulk import.
Here is a quote from the ES page that describes Pipeline:

To use a pipeline, you simply specify the pipeline parameter on an index or bulk request to tell the ingest node which pipeline to use.

The answer in the StackOverflow link I shared above has an additional example:
https://stackoverflow.com/a/41910185

Thanks for your prompt reply.

I've pushed a commit that aims to support pipeline support, it's on out_es_pipeline branch.

To test it in the output configuration just set a value for _pipeline_ .

Here it worked well, let me know how it works in your side.

Wow, that was fast, thank you!

I would be glad to check the branch on my machine. But I was not able so far to successfully build a Docker image with this branch. Do you have somewhere an instruction on what to modify in the Dockerfile to use some development branch? I tried to use the Dockerfile from the the 0.12-dev branch, modifying the FLB_TARBALL url. But apparently, it's not enough.

I get an error in the process:

/bin/sh: 1: cd: can't cd to fluent-bit-master/build/
ERROR: Service 'fluentd' failed to build: The command '/bin/sh -c apt-get -qq update     && apt-get install -y -qq        ca-certificates        build-essential        cmake        make        sudo        wget        unzip     && apt-get install -y -qq --reinstall lsb-base lsb-release     && wget -O "/tmp/fluent-bit-master.zip" ${FLB_TARBALL}     && cd /tmp && unzip "fluent-bit-master.zip"     && cd "fluent-bit-master"/build/     && cmake -DFLB_DEBUG=On -DFLB_TRACE=On -DFLB_JEMALLOC=On -DFLB_BUFFERING=On ../     && make     && install bin/fluent-bit /fluent-bit/bin/     && apt-get remove --purge --auto-remove -y -qq
      build-essential        cmake        make        wget        unzip     && rm -rf /tmp/*' returned a non-zero code: 2

I've rebased the changes on GIT master, try again.

Thanks.

For some reasons, it does not seem to be applying a pipeline.
Could it be that this line in the pull request should be different?
https://github.com/fluent/fluent-bit/commit/e63ec9550b323ab22f7435edc3fd8ea641ae1934

I guess it should be:
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/_bulk?pipeline=%s", tmp);
instead of
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/_bulk/?pipeline=%s", tmp);

It should be
/_bulk? instead of /_bulk/?

I tested here:

$ curl http://127.0.0.1:9200/fluent-bit/_search?pretty
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "fluent-bit",
        "_type" : "flb_type",
        "_id" : "AV0OvxVTIOxPTN-detZ8",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2017-07-04T17:56:26.0Z",
          "rand_value" : "10781455129980707414",
          "foo" : "bar"
        }
      },
      {
        "_index" : "fluent-bit",
        "_type" : "flb_type",
        "_id" : "AV0OzZeNIOxPTN-detZ-",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2017-07-04T18:12:17.0Z",
          "rand_value" : 15417425886927737057
        }
      }
    ]
  }
}

From a HTTP protocol perspective there is no distinction between the ending slash for bulk API

My pipeline file is:

{
  "description" : "describe pipeline",
  "processors" : [
    {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
    }
  ]
}

I tested my configuration.

  1. Regular test for using bulk:
POST /log/docker/_bulk
{"index":{"_id":"1"}}
{"@timestamp": "2017-07-04T19:14:16.0Z","log": """{"test":7,"someKey":"value+7","time":"2017-07-04T19:14:16.149Z"}""" }
{"index":{"_id":"2"}}
{"@timestamp": "2017-07-04T19:14:16.0Z","log": """{"test":8,"someKey":"value+8","time":"2017-07-04T19:14:16.151Z"}""" }

Content added to the log index:

GET /log/_search?pretty=true&q=*:*
{
  "took": 12,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "log",
        "_type": "docker",
        "_id": "2",
        "_score": 1,
        "_source": {
          "@timestamp": "2017-07-04T19:14:16.0Z",
          "log": """{"test":8,"someKey":"value+8","time":"2017-07-04T19:14:16.151Z"}"""
        }
      },
      {
        "_index": "log",
        "_type": "docker",
        "_id": "1",
        "_score": 1,
        "_source": {
          "@timestamp": "2017-07-04T19:14:16.0Z",
          "log": """{"test":7,"someKey":"value+7","time":"2017-07-04T19:14:16.149Z"}"""
        }
      }
    ]
  }
}

  1. I'm adding my pipeline:
PUT _ingest/pipeline/mypipeline
{
  "description" : "Convert String to JSON",
  "processors" : [
    {
        "json" : {
          "field" : "log",
          "target_field" : "parsed_log"
        }
    }
  ]
}
  1. I'm addind same content, but specifying the pipeline this time:
POST /log/docker/_bulk/?pipeline=mypipeline
{"index":{"_id":"1"}}
{"@timestamp": "2017-07-04T19:14:16.0Z","log": """{"test":7,"someKey":"value+7","time":"2017-07-04T19:14:16.149Z"}""" }
{"index":{"_id":"2"}}
{"@timestamp": "2017-07-04T19:14:16.0Z","log": """{"test":8,"someKey":"value+8","time":"2017-07-04T19:14:16.151Z"}""" }

Indexed content (I deleted old records before). You can see that the "parsed_log" is added:

GET /log/_search?pretty=true&q=*:*
{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "log",
        "_type": "docker",
        "_id": "2",
        "_score": 1,
        "_source": {
          "@timestamp": "2017-07-04T19:14:16.0Z",
          "log": """{"test":8,"someKey":"value+8","time":"2017-07-04T19:14:16.151Z"}""",
          "parsed_log": {
            "test": 8,
            "time": "2017-07-04T19:14:16.151Z",
            "someKey": "value+8"
          }
        }
      },
      {
        "_index": "log",
        "_type": "docker",
        "_id": "1",
        "_score": 1,
        "_source": {
          "@timestamp": "2017-07-04T19:14:16.0Z",
          "log": """{"test":7,"someKey":"value+7","time":"2017-07-04T19:14:16.149Z"}""",
          "parsed_log": {
            "test": 7,
            "time": "2017-07-04T19:14:16.149Z",
            "someKey": "value+7"
          }
        }
      }
    ]
  }
}

Here is my ES output config I'm using to test the pipeline feature (I emptied the index before running my test application and created the ES pipeline before launching the app I want logs added):

[OUTPUT]
    Name        es
    Match       docker
    Host        elastic
    Port        9200
    Index       log
    Type        docker
    Pipeline    mypipeline

I might be doing something wrong with the config, I guess. I get the events added into ES, but I do not see the pipeline applied. Added content does not get the "parsed_log" field and its content.

@edsiper ,

Actually, everything is working fine. My bad. I simply used the fluent/fluent-bit:0.12-dev Docker image (with the assumption that you might have docker image building on each code merge to master). And apparently, it was built 1 day ago.

Manually building the images using the Dockerfile provided solved the issues. Everything works great!

Thanks for a very fast turnaround!

awesome, 0.12 will be available shortly.

thanks again.

Just for the record: I STRONGLY suggest do filtering and parsing on Fluent Bit side, doing it on Elasticsearch is times expensive.

I'm sure Fluent Bit is much more efficient. I would say that ES Pipelines might work as a temporary workaround for some more complex scenarios till Fluent Bit adds all the features to make Pipelines not needed.

Was this page helpful?
0 / 5 - 0 ratings