Logstash: When java execution is enabled, periodic_flushers and shutdown_flushers are never called

Created on 31 Jul 2020  路  6Comments  路  Source: elastic/logstash

An aggregate filter that uses push_map_as_event_on_timeout relies on flush being called regularly. This does not occur if java execution is enabled. Originally mentioned here.

If java execution is disabled, then with this data

INFO - 12345 - Clicked One
INFO - 12345 - Clicked Two
INFO - 12345 - Clicked Three

and this configuration

input { stdin {} }
filter {
    grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ] }
    aggregate {
        task_id => "%{user_id}"
        code => "map['clicks'] ||= 0; map['clicks'] += 1;"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "user_id"
        timeout => 10
    }
}
output  { stdout { codec => rubydebug { metadata => false } } }

I get these messages

[2020-06-30T18:50:48,464][TRACE][logstash.filters.aggregate][main] Aggregate flush call with {:final=>false}
[2020-06-30T18:50:48,466][DEBUG][logstash.filters.aggregate][main] Aggregate remove_expired_maps call with '%{user_id}' pattern and 1 maps
[2020-06-30T18:50:48,487][DEBUG][logstash.filters.aggregate][main] Aggregate create_timeout_event call with task_id '12345'
[2020-06-30T18:50:48,513][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=12345

and an event that looks like

{
   "user_id" => "12345",
    "clicks" => 3,
"@timestamp" => 2020-06-30T22:50:48.490Z,
  "@version" => "1"
}

If I enable java_execution then those messages disappear (the flush never occurs) and the map contents are never pushed.

Java Execution bug

All 6 comments

@TheVastyDeep I believe this is fixed in the latest 7.9.0 release

I just upgraded to 7.9.0 and it does not appear to be fixed.

Apologies for the confusion - I missed that I had left the number of pipeline workers to 2 when running tests on 7.9.0. You are correct, the behaviour is still broken when java execution is true, and the number of pipeline workers is 1

Which made me think to test whether pipeline.ordered is related. It is. If that is set to false then the flusher is called even when java execution is true.

Another reproducer for this bug, I've tested in 7.6.0 and works while breaks on 7.7.0.


pipeline definition

input {
    generator {
        lines => [
            '{ "id": "1", "name": "Adam", "child": "Alex"}',
            '{ "id": "1", "name": "Adam", "child": "Galen"}'
        ]
        count => 1
        codec => "json"
    }
} 

filter {
    aggregate {
        task_id => "%{id}"
        code => "map['m_id'] = event.get('id')"
        push_previous_map_as_event => true
    }    
    mutate {
        remove_field => [
            "host", "sequence", "@version", "@timestamp"
        ]
    }
}

output {
    stdout {
        codec => "rubydebug"
    }
}

Launching with REE with bin/logstash -f pipeline.conf -w 1 --java-execution=false it writes the summary event at the end

{
    "child" => "Alex",
     "name" => "Adam",
       "id" => "1"
}
{
    "child" => "Galen",
     "name" => "Adam",
       "id" => "1"
}
{
    "m_id" => "1",
    "tags" => [
        [0] "_aggregatefinalflush"
    ]
}

while with JEE bin/logstash -f pipeline.conf -w 1 --java-execution=true it skip it:

{
     "name" => "Adam",
    "child" => "Alex",
       "id" => "1"
}
{
     "name" => "Adam",
    "child" => "Galen",
       "id" => "1"
}


REE debug log

[2020-08-25T14:39:15,506][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
{
     "name" => "Adam",
    "child" => "Alex",
       "id" => "1"
}
{
     "name" => "Adam",
    "child" => "Galen",
       "id" => "1"
}
[2020-08-25T14:39:15,679][DEBUG][logstash.pipeline        ][main] Flushing {:plugin=>#<LogStash::FilterDelegator:0x7d3a32db>}
[2020-08-25T14:39:15,691][DEBUG][logstash.filters.aggregate][main] Aggregate timeout for '%{id}' pattern: 1800 seconds
[2020-08-25T14:39:15,712][DEBUG][logstash.filters.aggregate][main] Aggregate remove_expired_maps call with '%{id}' pattern and 1 maps
[2020-08-25T14:39:15,728][DEBUG][logstash.filters.aggregate][main] Aggregate create_timeout_event call with task_id '1'
[2020-08-25T14:39:15,740][DEBUG][logstash.pipeline        ][main] Flushing {:plugin=>#<LogStash::FilterDelegator:0x7d3a32db>, :events=>[{"m_id"=>"1", "@version"=>"1", "tags"=>["_aggregatefinalflush"], "@timestamp"=>2020-08-25T12:39:15.729Z}]}
[2020-08-25T14:39:15,745][DEBUG][logstash.filters.mutate  ][main][811e511149838dd77541a5e6889e1e05751c0596ff9faa0c54e63a1de88e8890] filters/LogStash::Filters::Mutate: removing field {:field=>"host"}
[2020-08-25T14:39:15,747][DEBUG][logstash.filters.mutate  ][main][811e511149838dd77541a5e6889e1e05751c0596ff9faa0c54e63a1de88e8890] filters/LogStash::Filters::Mutate: removing field {:field=>"sequence"}
[2020-08-25T14:39:15,750][DEBUG][logstash.filters.mutate  ][main][811e511149838dd77541a5e6889e1e05751c0596ff9faa0c54e63a1de88e8890] filters/LogStash::Filters::Mutate: removing field {:field=>"@version"}
[2020-08-25T14:39:15,752][DEBUG][logstash.filters.mutate  ][main][811e511149838dd77541a5e6889e1e05751c0596ff9faa0c54e63a1de88e8890] filters/LogStash::Filters::Mutate: removing field {:field=>"@timestamp"}
[2020-08-25T14:39:15,759][DEBUG][logstash.pipeline        ][main] Pushing flushed events {:pipeline_id=>"main", :event=>#<LogStash::Event:0x1ad460b2>, :thread=>"#<Thread:0x5a718fd9 sleep>"}
[2020-08-25T14:39:15,762][DEBUG][logstash.pipeline        ][main] output received {"event"=>{"m_id"=>"1", "tags"=>["_aggregatefinalflush"]}}
{
    "m_id" => "1",
    "tags" => [
        [0] "_aggregatefinalflush"
    ]
}
[2020-08-25T14:39:15,826][DEBUG][logstash.filters.aggregate][main] Closing {:plugin=>"LogStash::Filters::Aggregate"}
[2020-08-25T14:39:15,834][DEBUG][logstash.filters.aggregate][main] Aggregate close call {:code=>"\n            map['m_id'] = event.get('id')\n        "}
[2020-08-25T14:39:15,846][DEBUG][logstash.pluginmetadata  ][main] Removing metadata for plugin f2d37046e5deca9df3cf714fe0b14968a5da3d91721d1eb6f68f90fa93cd4f70
[2020-08-25T14:39:15,848][DEBUG][logstash.filters.mutate  ][main] Closing {:plugin=>"LogStash::Filters::Mutate"}
[2020-08-25T14:39:15,851][DEBUG][logstash.pluginmetadata  ][main] Removing metadata for plugin 811e511149838dd77541a5e6889e1e05751c0596ff9faa0c54e63a1de88e8890
[2020-08-25T14:39:15,853][DEBUG][logstash.outputs.stdout  ][main] Closing {:plugin=>"LogStash::Outputs::Stdout"}
[2020-08-25T14:39:15,857][DEBUG][logstash.pluginmetadata  ][main] Removing metadata for plugin 351ee432fedc8639c7f5c0c3a099bcb31af2492bbf199249d3d6a394f883e591
[2020-08-25T14:39:15,862][INFO ][logstash.pipeline        ][main] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x5a718fd9 run>"}
[2020-08-25T14:39:16,001][DEBUG][logstash.instrument.periodicpoller.os] Stopping
[2020-08-25T14:39:16,018][DEBUG][logstash.instrument.periodicpoller.jvm] Stopping
[2020-08-25T14:39:16,020][DEBUG][logstash.instrument.periodicpoller.persistentqueue] Stopping
[2020-08-25T14:39:16,022][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] Stopping
[2020-08-25T14:39:16,038][DEBUG][logstash.agent           ] Shutting down all pipelines {:pipelines_count=>0}
[2020-08-25T14:39:16,044][DEBUG][logstash.agent           ] Converging pipelines state {:actions_count=>0}
[2020-08-25T14:39:16,123][INFO ][logstash.runner          ] Logstash shut down.


JEE debug log

[2020-08-25T14:38:39,378][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
{
    "child" => "Alex",
       "id" => "1",
     "name" => "Adam"
}
{
    "child" => "Galen",
       "id" => "1",
     "name" => "Adam"
}
[2020-08-25T14:38:39,644][DEBUG][logstash.filters.aggregate][main] Closing {:plugin=>"LogStash::Filters::Aggregate"}
[2020-08-25T14:38:39,647][DEBUG][logstash.filters.aggregate][main] Aggregate close call {:code=>"\n            map['m_id'] = event.get('id')\n        "}
[2020-08-25T14:38:39,653][DEBUG][logstash.pluginmetadata  ][main] Removing metadata for plugin f2d37046e5deca9df3cf714fe0b14968a5da3d91721d1eb6f68f90fa93cd4f70
[2020-08-25T14:38:39,659][DEBUG][logstash.filters.mutate  ][main] Closing {:plugin=>"LogStash::Filters::Mutate"}
[2020-08-25T14:38:39,665][DEBUG][logstash.pluginmetadata  ][main] Removing metadata for plugin 811e511149838dd77541a5e6889e1e05751c0596ff9faa0c54e63a1de88e8890
[2020-08-25T14:38:39,668][DEBUG][logstash.outputs.stdout  ][main] Closing {:plugin=>"LogStash::Outputs::Stdout"}
[2020-08-25T14:38:39,674][DEBUG][logstash.pluginmetadata  ][main] Removing metadata for plugin 351ee432fedc8639c7f5c0c3a099bcb31af2492bbf199249d3d6a394f883e591
[2020-08-25T14:38:39,678][DEBUG][logstash.javapipeline    ][main] Pipeline has been shutdown {:pipeline_id=>"main", :thread=>"#<Thread:0x7402537a run>"}
[2020-08-25T14:38:39,689][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2020-08-25T14:38:39,706][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2020-08-25T14:38:39,845][DEBUG][logstash.instrument.periodicpoller.os] Stopping
[2020-08-25T14:38:39,850][DEBUG][logstash.instrument.periodicpoller.jvm] Stopping
[2020-08-25T14:38:39,852][DEBUG][logstash.instrument.periodicpoller.persistentqueue] Stopping
[2020-08-25T14:38:39,853][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] Stopping
[2020-08-25T14:38:39,857][DEBUG][logstash.agent           ] Shutting down all pipelines {:pipelines_count=>0}
[2020-08-25T14:38:39,859][DEBUG][logstash.agent           ] Converging pipelines state {:actions_count=>0}
[2020-08-25T14:38:39,936][INFO ][logstash.runner          ] Logstash shut down.

This should be fixed by #12204 and will be available starting in 7.9.1

Was this page helpful?
0 / 5 - 0 ratings