use logstash to ship kafka message to es,
Error message is following:
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "[main]
java.lang.OutOfMemoryError: Java heap space
Exception in thread "logstash_bizlog_-es01-1476176193455-43933fa1_watcher_executor" java.lang.OutOfMemoryError: Java heap space
then i increase Xmx to 2g, by set LS_HEAP_SIZE="2g"
still get same error, and dump heapdump.hprof with size 2.1G
by the way, logstash-2.1.1 works fine.
config file is following:
input {
kafka {
codec => "line"
topic_id => "topic_xx_log"
group_id => "logstash_log"
zk_connect => "zk01:2181"
reset_beginning => false
consumer_threads => 10
rebalance_max_retries => 50
rebalance_backoff_ms => 20000
}
}
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:logging_time}\t%{NUMBER:biz_type:int}\t%{GREEDYDATA:data}\t%{IP:local_ip}"
}
}
if [biz_type] {
if ( [data] and [data] =~ /(.*)\{(.+)\}(.*)/ ) {
json {
source => "data"
remove_field => [ "data" ]
}
} else {
}
if [biz_type] == 10030 {
ruby {
code => ""
add_tag => "extra_plus"
}
} else if [biz_type] == 10037 {
ruby {
code => ""
add_tag => "extra_plus"
}
} else if [biz_type] == 10038 {
ruby {
code => ""
add_tag => "extra_plus"
}
} else if [biz_type] == 10047 {
ruby {
code => ""
add_tag => "extra_plus"
}
} else if [biz_type] == 10050 {
ruby {
code => ""
add_tag => "extra_plus"
}
} else if [biz_type] == 10052 {
ruby {
code => ""
add_tag => "extra_plus"
}
} else if [biz_type] == 10056 {
ruby {
code => ""
add_tag => "extra_plus"
}
} else if [biz_type] == 10056 {
ruby {
code => ""
add_tag => "extra_plus"
}
} else if [biz_type] == 10002 {
ruby {
code => ""
add_tag => "extra_plus"
}
} else if [biz_type] == 10001 {
ruby {
code => ""
add_tag => "extra_plus"
}
} else if [biz_type] == 10018 {
ruby {
code => "event.cancel"
}
} else if [biz_type] == 10016 {
metrics {
meter => "error_%{biz_type}"
add_tag => "metric"
flush_interval => 5
clear_interval => 15
ignore_older_than => 10
}
}
if "metric" in [tags] {
ruby {
code => "event.cancel if event['error_10016']['rate_1m'] * 60 < 15"
}
}
ruby {
code => ""
remove_field => [ "message" ]
}
} else {
ruby {
code => ""
add_field => { "biz_type" => "unknown" }
}
}
ruby {
code => ""
remove_tag => [ "_jsonparsefailure"]
}
}
output {
elasticsearch {
hosts => ["xx-es02:9200","xx-es03:9200"]
document_type => "log_record_%{biz_type}"
index => "log-%{+YYYY-MM-dd}"
template_name => "template_revised"
template => "/data/applications/logstash/setting/xx-template.json"
template_overwrite => true
manage_template => true
}
if "extra_plus" in [tags] {
elasticsearch {
hosts => ["xx-es02:9200","xx-es03:9200"]
document_type => "item_%{biz_type}"
index => "%{biz_type}-%{+YYYY-MM}"
}
}
if "metric" in [tags] {
exec {
command => "/data/applications/logstash/conf/alert.sh"
}
}
}
after use MAT to analyse heapdump.hprof, find out the biggest object is RubyObject, the list_objects pic is following:

can you upload the heap dump somewhere so we can analyse it? I can send you a private link through email
that's good, my email is [email protected]. file size is about 136M after compressed
link sent to your email @yzhang226
headdump file uploaded, good luck
@yzhang226 the memory build up is happening within the input stage in kafka, but it's still unclear if the responsible is the line codec or not.
In logstash 2.1.1 were you also using the line codec in the kafka plugin? or perhaps the line codec?
@jsvd logstash 2.1.1 DO NOT have codec option, codec is new option in logstash 2.4
@jsvd according the latest logstash document, https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html, default codec is json, but my kafka input is line, that's why i use codec
@yzhang226 the line codec is only used if you send a continuous stream of bytes that contain new lines, "\n". The line codec then breaks the input buffer by new line and each slice becomes a new event.
If you're simply consuming normal data from kafka, please try the "plain" codec instead of json, and check that the problem is fixed
@jsvd after i modify codec to plain, the logstash do not crash.
it's my fault by using wrong codec.
thank you very much!