Hello,
I wonder if you might consider the option to add Pipeline option to the ES Output Plugin.
This would enable more complex processing of data using the Pipeline functionality.
Some more info:
https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html
Here is an article that mentions how Logstash can be configured to use it:
https://www.elastic.co/blog/new-way-to-ingest-part-1
And here are a couple of options how the Pipeline can be specified in the request:
https://stackoverflow.com/questions/41909936/elastic-search-bulk-api-pipeline-and-geo-ip
I know that some of the use cases of Pipelines will be covered with the upcoming release of new plugins, e.g. the upcoming filter_parser that would enable parsing string into JSON. But it looks like the ES Pipelines is currently somewhat more flexible and powerful.
I guess implementing this functionality could be done relatively fast, enable users to get more benefits from fluent-bit faster and allowing the development team to concentrate on some other features.
Thanks,
Vlad
@spatialvlad thanks for the idea.
One concern that I see with this optional approach, is that Fluent Bit will need to issue one HTTP request per document/record. At the moment we use Bulk API where we ingest multiple documents in one request.
Do you know if this Pipeline stuff allows bulk import ?
@edsiper ,
Switching to one request per document would be not very good.
Luckily, it looks like the Pipelines work with the bulk import.
Here is a quote from the ES page that describes Pipeline:
To use a pipeline, you simply specify the pipeline parameter on an index or bulk request to tell the ingest node which pipeline to use.
The answer in the StackOverflow link I shared above has an additional example:
https://stackoverflow.com/a/41910185
Thanks for your prompt reply.
I've pushed a commit that aims to support pipeline support, it's on out_es_pipeline branch.
To test it in the output configuration just set a value for _pipeline_ .
Here it worked well, let me know how it works in your side.
Wow, that was fast, thank you!
I would be glad to check the branch on my machine. But I was not able so far to successfully build a Docker image with this branch. Do you have somewhere an instruction on what to modify in the Dockerfile to use some development branch? I tried to use the Dockerfile from the the 0.12-dev branch, modifying the FLB_TARBALL url. But apparently, it's not enough.
I get an error in the process:
/bin/sh: 1: cd: can't cd to fluent-bit-master/build/
ERROR: Service 'fluentd' failed to build: The command '/bin/sh -c apt-get -qq update && apt-get install -y -qq ca-certificates build-essential cmake make sudo wget unzip && apt-get install -y -qq --reinstall lsb-base lsb-release && wget -O "/tmp/fluent-bit-master.zip" ${FLB_TARBALL} && cd /tmp && unzip "fluent-bit-master.zip" && cd "fluent-bit-master"/build/ && cmake -DFLB_DEBUG=On -DFLB_TRACE=On -DFLB_JEMALLOC=On -DFLB_BUFFERING=On ../ && make && install bin/fluent-bit /fluent-bit/bin/ && apt-get remove --purge --auto-remove -y -qq
build-essential cmake make wget unzip && rm -rf /tmp/*' returned a non-zero code: 2
I've rebased the changes on GIT master, try again.
Thanks.
For some reasons, it does not seem to be applying a pipeline.
Could it be that this line in the pull request should be different?
https://github.com/fluent/fluent-bit/commit/e63ec9550b323ab22f7435edc3fd8ea641ae1934
I guess it should be:
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/_bulk?pipeline=%s", tmp);
instead of
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/_bulk/?pipeline=%s", tmp);
It should be
/_bulk? instead of /_bulk/?
I tested here:
$ curl http://127.0.0.1:9200/fluent-bit/_search?pretty
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "fluent-bit",
"_type" : "flb_type",
"_id" : "AV0OvxVTIOxPTN-detZ8",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2017-07-04T17:56:26.0Z",
"rand_value" : "10781455129980707414",
"foo" : "bar"
}
},
{
"_index" : "fluent-bit",
"_type" : "flb_type",
"_id" : "AV0OzZeNIOxPTN-detZ-",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2017-07-04T18:12:17.0Z",
"rand_value" : 15417425886927737057
}
}
]
}
}
From a HTTP protocol perspective there is no distinction between the ending slash for bulk API
My pipeline file is:
{
"description" : "describe pipeline",
"processors" : [
{
"set" : {
"field": "foo",
"value": "bar"
}
}
]
}
I tested my configuration.
POST /log/docker/_bulk
{"index":{"_id":"1"}}
{"@timestamp": "2017-07-04T19:14:16.0Z","log": """{"test":7,"someKey":"value+7","time":"2017-07-04T19:14:16.149Z"}""" }
{"index":{"_id":"2"}}
{"@timestamp": "2017-07-04T19:14:16.0Z","log": """{"test":8,"someKey":"value+8","time":"2017-07-04T19:14:16.151Z"}""" }
Content added to the log index:
GET /log/_search?pretty=true&q=*:*
{
"took": 12,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "log",
"_type": "docker",
"_id": "2",
"_score": 1,
"_source": {
"@timestamp": "2017-07-04T19:14:16.0Z",
"log": """{"test":8,"someKey":"value+8","time":"2017-07-04T19:14:16.151Z"}"""
}
},
{
"_index": "log",
"_type": "docker",
"_id": "1",
"_score": 1,
"_source": {
"@timestamp": "2017-07-04T19:14:16.0Z",
"log": """{"test":7,"someKey":"value+7","time":"2017-07-04T19:14:16.149Z"}"""
}
}
]
}
}
PUT _ingest/pipeline/mypipeline
{
"description" : "Convert String to JSON",
"processors" : [
{
"json" : {
"field" : "log",
"target_field" : "parsed_log"
}
}
]
}
POST /log/docker/_bulk/?pipeline=mypipeline
{"index":{"_id":"1"}}
{"@timestamp": "2017-07-04T19:14:16.0Z","log": """{"test":7,"someKey":"value+7","time":"2017-07-04T19:14:16.149Z"}""" }
{"index":{"_id":"2"}}
{"@timestamp": "2017-07-04T19:14:16.0Z","log": """{"test":8,"someKey":"value+8","time":"2017-07-04T19:14:16.151Z"}""" }
Indexed content (I deleted old records before). You can see that the "parsed_log" is added:
GET /log/_search?pretty=true&q=*:*
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "log",
"_type": "docker",
"_id": "2",
"_score": 1,
"_source": {
"@timestamp": "2017-07-04T19:14:16.0Z",
"log": """{"test":8,"someKey":"value+8","time":"2017-07-04T19:14:16.151Z"}""",
"parsed_log": {
"test": 8,
"time": "2017-07-04T19:14:16.151Z",
"someKey": "value+8"
}
}
},
{
"_index": "log",
"_type": "docker",
"_id": "1",
"_score": 1,
"_source": {
"@timestamp": "2017-07-04T19:14:16.0Z",
"log": """{"test":7,"someKey":"value+7","time":"2017-07-04T19:14:16.149Z"}""",
"parsed_log": {
"test": 7,
"time": "2017-07-04T19:14:16.149Z",
"someKey": "value+7"
}
}
}
]
}
}
Here is my ES output config I'm using to test the pipeline feature (I emptied the index before running my test application and created the ES pipeline before launching the app I want logs added):
[OUTPUT]
Name es
Match docker
Host elastic
Port 9200
Index log
Type docker
Pipeline mypipeline
I might be doing something wrong with the config, I guess. I get the events added into ES, but I do not see the pipeline applied. Added content does not get the "parsed_log" field and its content.
@edsiper ,
Actually, everything is working fine. My bad. I simply used the fluent/fluent-bit:0.12-dev Docker image (with the assumption that you might have docker image building on each code merge to master). And apparently, it was built 1 day ago.
Manually building the images using the Dockerfile provided solved the issues. Everything works great!
Thanks for a very fast turnaround!
awesome, 0.12 will be available shortly.
thanks again.
Just for the record: I STRONGLY suggest do filtering and parsing on Fluent Bit side, doing it on Elasticsearch is times expensive.
I'm sure Fluent Bit is much more efficient. I would say that ES Pipelines might work as a temporary workaround for some more complex scenarios till Fluent Bit adds all the features to make Pipelines not needed.
Most helpful comment
awesome, 0.12 will be available shortly.
thanks again.