A user has reported a problem where all data appears to be processed successfully by Logstash but many queue pages still exist on disk.
I am able to reproduce this.
Found in Logstash 5.2.1
Simplest reproduction case:
bin/logstash -e 'input { generator { threads => 2 } } output { null { } }'
While debugging this, I found an interesting error:
Configuration:
bin/logstash -e 'input { generator { threads => 4 } } output { null { } }'
queue.type: persisted
queue.page_capacity: 1mb # A config for making the pages roll quickly.
queue.max_bytes: 10mb # Not recommended to have pages this small.
```
Exception in thread "[main]>worker6" java.lang.IndexOutOfBoundsException: bitInd
ex < 0: -1
at java.util.BitSet.set(BitSet.java:444)
at org.logstash.ackedqueue.Page.ack(Page.java:104)
at org.logstash.ackedqueue.Queue.ack(Queue.java:511)
at org.logstash.ackedqueue.Batch.close(Batch.java:26)
at org.logstash.ackedqueue.ext.JrubyAckedBatchExtLibrary$RubyAckedBatch.
ruby_close(JrubyAckedBatchExtLibrary.java:80)
at org.logstash.ackedqueue.ext.JrubyAckedBatchExtLibrary$RubyAckedBatch$
INVOKER$i$0$0$ruby_close.call(JrubyAckedBatchExtLibrary$RubyAckedBatch$INVOKER$i
$0$0$ruby_close.gen)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:
134)
at org.jruby.ast.CallNoArgNode.interpret(CallNoArgNode.java:60)
This also reproduces it:
bin/logstash -e 'input { generator { threads => 2 } } output { null { } }'
I am unable to get bin/cpdump to run (crashes trying to find rake in bundler), so I wrote a oneliner to watch the queue states:
while true; do clear; ruby -rpp -e 'Dir.glob("data/queue/checkpoint.*").sort_by { |x| x[/[0-9]+$/].to_i}.each { |checkpoint| data = File.read(checkpoint); version, page, firstUnackedPage, firstUnackedSeq, minSeq, elementCount, crc32 = data.unpack("nNNQ>Q>NN"); puts File.basename(checkpoint); pagepath = "data/queue/page.#{page}"; pagestat = File.stat(pagepath); using = pagestat.blocks * 512; puts "Page: #{pagepath} (allocated #{pagestat.size} bytes, using #{using} #{using * 100 / pagestat.size}%)"; p(version: version, page: page, firstUnackedPage: firstUnackedPage, firstUnackedSeq: firstUnackedSeq, minSeq: minSeq, elementCount: elementCount, crc32: crc32) }'; sleep 1; done;
Reproduced on 5.3.0-SNAPSHOT
Reproduced on 5.4.0-SNAPSHOT
I believe the issue is derived by the writes not being thread-safe. seqNum are incremented in a non-thread-safe manner during writes, which can yield interesting values and potential gaps in values.
this change should fix that portion of the problem (where there are multiple input threads)
+++ b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb
@@ -340,6 +340,7 @@ module LogStash; module Util
class WriteClient
def initialize(queue)
+ @mut = Mutex.new
@queue = queue
end
@@ -351,7 +352,7 @@ module LogStash; module Util
if @queue.closed?
raise QueueClosedError.new("Attempted to write an event to a closed AckedQueue")
end
- @queue.push(event)
+ @mut.synchronize { @queue.push(event) }
end
alias_method(:<<, :push)
I am not quiet sure about how this problem manifests itself when dealing with one input worker, but having a split filter.
I'm going to test this patch shortly. Thanks @talevy!
@jordansissel coool. I'm not necessarily proposing we use this as a solution (if it is one), but it demonstrates that this portion of the write method: https://github.com/elastic/logstash/blob/9f5506db078a17c584a86c02009759b45ead59c3/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java#L297-L302
should also be placed under a lock, just as the bottom portion is. Since two threads can increment seqNum with nextSeqNum and the second could enter the mutex before the first, resulting in the first acked seqNum to not be in agreement with the constructor's initial minSeqNum of 0. This is just the first thread-safety issue I see, during experimental runs, I've also run into this after the first write.
I have a test case that reproduces this -- https://gist.github.com/jordansissel/518667ab4bd2df9d41244427917c9160
Next step is for checking places we should be locking.
@talevy I think the WrappedAckedQueue should carry the lock since the WriteClient is created by queue.write_client which creates a new WriteClient for every invocation. I'm going to try locking the push calls on the parent queue to see if that helps first.
diff --git a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb
index b733d22..f81e88b 100644
--- a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb
+++ b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb
@@ -40,6 +40,7 @@ module LogStash; module Util
@queue = queue
@queue.open
@closed = Concurrent::AtomicBoolean.new(false)
+ @mutex = Mutex.new
self
end
@@ -53,7 +54,9 @@ module LogStash; module Util
# @param [Object] Object to add to the queue
def push(obj)
check_closed("write")
- @queue.write(obj)
+ @mutex.synchronize do
+ @queue.write(obj)
+ end
end
alias_method(:<<, :push)
This patch seems to help.
^^ Patch does not help. With 5 writers and 1 reader, this happens:
# Watch the checkpoints/pages for stats:
% while true; do echo "------"; ruby -rpp -e 'Dir.glob(ARGV[0] + "/checkpoint.*").sort_by { |x| x[/[0-9]+$/].to_i}.each { |checkpoint| data = File.read(checkpoint); version, page, firstUnackedPage, firstUnackedSeq, minSeq, elementCount, crc32 = data.unpack("nNNQ>Q>NN"); puts File.basename(checkpoint); pagepath = ARGV[0] + "/page.#{page}"; pagestat = File.stat(pagepath); using = pagestat.blocks * 512; puts "Page: #{pagepath} (allocated #{pagestat.size} bytes, using #{using} #{using * 100 / pagestat.size}%)"; p(version: version, page: page, firstUnackedPage: firstUnackedPage, firstUnackedSeq: firstUnackedSeq, minSeq: minSeq, elementCount: elementCount, crc32: crc32) }' $(ls -dt /tmp/studtmp-* | head -1); sleep 1; done;
Page: /tmp/studtmp-e807288856092cd4673cf45b43227516cd1b372009317f2ab006540e80bf//page.21 (allocated 256000 bytes, using 229376 89%)
{:version=>1, :page=>21, :firstUnackedPage=>13, :firstUnackedSeq=>50001, :minSeq=>47986, :elementCount=>2015, :crc32=>225857278}
checkpoint.13
Page: /tmp/studtmp-e807288856092cd4673cf45b43227516cd1b372009317f2ab006540e80bf//page.13 (allocated 256000 bytes, using 258048 100%)
{:version=>1, :page=>13, :firstUnackedPage=>0, :firstUnackedSeq=>31991, :minSeq=>29706, :elementCount=>2285, :crc32=>2049478859}
checkpoint.14
-e:1:in `stat': No such file or directory @ rb_file_s_stat - /tmp/studtmp-e807288856092cd4673cf45b43227516cd1b372009317f2ab006540e80bf//page.14 (Errno::ENOENT)
from -e:1:in `block in <main>'
from -e:1:in `each'
from -e:1:in `<main>'
Notable:
For checkpoint 13, above,, it's interesting.
The interesting part here is that first unacked - min seq == element count. 31991 - 29706 = 2285. This means this page is fully acked, and yet it remains on disk.
@guyboertje Could I get your help on this?
@jordansissel
I think that the mutex in the ReadClient should also be used for the WriteClient.
my suggestion above does not work because when the queue is full a writer is blocked on the Java notify while it has the mutex meaning that a reader will never get the mutex to unblock the writer. deadlock.
Per @talevy comment - I agree that
long seqNum = nextSeqNum();
in the Queue.write() method needs to be inside the mutex lock. The Queue seqNum is not volatile and this can lead to seqNum race conditions with concurrent writers.
This is definitely a potential bug.
But otherwise the rest of the code prior to the mutex lock can stay where it is https://github.com/elastic/logstash/blob/3494f1e68c599a06118c7f0010c3b90e28ce3861/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java#L298-L302
Also, there is no need to add a mutex in the WriteClient, as long as the Queue.write method is properly thread safe (and it is not per the bug in my previous comment). All that WriteClient#push does it defers ultimately to Queue.write() so as long as the latter is thread safe, no mutex is necessary on the Ruby side WriteClient.
@colinsurprenant I agree. I mention it in my above comments to demonstrate that behavior becomes more consistent when one does so, which means write is not thread safe.
@jordansissel about your comment that
- Sometimes a page is removed but its checkpoint is not removed.
This can be a normal behaviour - it's non obvious code but in https://github.com/elastic/logstash/blob/3494f1e68c599a06118c7f0010c3b90e28ce3861/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java#L577-L588 we will delete a page data file if it is fully acked but only delete the checkpoint if it is the first one. We decided to do this to make it easier on the recovery part where we can thus expect to always find/read contiguous checkpoints.
In other words, if a "middle" page is fully acked, its data file will be deleted but its checkpoint will stay on disk as long as it does not become the first checkpoint file or it becomes part of a sequence of "empty" checkpoints starting from the first page.
See this gist
With PR 6901 applied we still have problems.
Everything should now be fixed in #6901
Awaiting final review on #6901. Also note that @guyboertje spec has been updated to https://gist.github.com/colinsurprenant/b3ca535857a0e5d4ab48580fdfd653fe and all tests are passing.
We will merge in 5.4 and for 5.3.1 and we should look at back porting in 5.2 and 5.1
Yes it is, confirmed ++.
fixed by #6901 - will be part of upcoming 5.3.1 and 5.4.
Most helpful comment
Also, there is no need to add a mutex in the
WriteClient, as long as theQueue.writemethod is properly thread safe (and it is not per the bug in my previous comment). All thatWriteClient#pushdoes it defers ultimately toQueue.write()so as long as the latter is thread safe, no mutex is necessary on the Ruby sideWriteClient.