Logstash: PQ is sometimes not cleaning up old pages.

Created on 22 Mar 2017  路  24Comments  路  Source: elastic/logstash

A user has reported a problem where all data appears to be processed successfully by Logstash but many queue pages still exist on disk.

I am able to reproduce this.

Found in Logstash 5.2.1

Simplest reproduction case:

bin/logstash -e 'input { generator { threads => 2 }  } output { null { } }'
blocker bug persistent queues v5.3.1 v5.4.0

Most helpful comment

Also, there is no need to add a mutex in the WriteClient, as long as the Queue.write method is properly thread safe (and it is not per the bug in my previous comment). All that WriteClient#push does it defers ultimately to Queue.write() so as long as the latter is thread safe, no mutex is necessary on the Ruby side WriteClient.

All 24 comments

While debugging this, I found an interesting error:

Configuration:

bin/logstash -e 'input { generator { threads => 4 } } output { null { } }'
queue.type: persisted
queue.page_capacity: 1mb # A config for making the pages roll quickly. 
queue.max_bytes: 10mb # Not recommended to have pages this small.

```

Exception in thread "[main]>worker6" java.lang.IndexOutOfBoundsException: bitInd
ex < 0: -1
at java.util.BitSet.set(BitSet.java:444)
at org.logstash.ackedqueue.Page.ack(Page.java:104)
at org.logstash.ackedqueue.Queue.ack(Queue.java:511)
at org.logstash.ackedqueue.Batch.close(Batch.java:26)
at org.logstash.ackedqueue.ext.JrubyAckedBatchExtLibrary$RubyAckedBatch.
ruby_close(JrubyAckedBatchExtLibrary.java:80)
at org.logstash.ackedqueue.ext.JrubyAckedBatchExtLibrary$RubyAckedBatch$
INVOKER$i$0$0$ruby_close.call(JrubyAckedBatchExtLibrary$RubyAckedBatch$INVOKER$i
$0$0$ruby_close.gen)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:
134)
at org.jruby.ast.CallNoArgNode.interpret(CallNoArgNode.java:60)


This also reproduces it:

bin/logstash -e 'input { generator { threads => 2 } } output { null { } }'

I am unable to get bin/cpdump to run (crashes trying to find rake in bundler), so I wrote a oneliner to watch the queue states:

while true; do clear; ruby -rpp -e 'Dir.glob("data/queue/checkpoint.*").sort_by { |x| x[/[0-9]+$/].to_i}.each { |checkpoint| data = File.read(checkpoint); version, page, firstUnackedPage, firstUnackedSeq, minSeq, elementCount, crc32 = data.unpack("nNNQ>Q>NN"); puts File.basename(checkpoint); pagepath = "data/queue/page.#{page}"; pagestat = File.stat(pagepath);  using = pagestat.blocks * 512; puts "Page: #{pagepath} (allocated #{pagestat.size} bytes, using #{using} #{using * 100 / pagestat.size}%)"; p(version: version, page: page, firstUnackedPage: firstUnackedPage, firstUnackedSeq: firstUnackedSeq, minSeq: minSeq, elementCount: elementCount, crc32: crc32) }'; sleep 1; done;

Reproduced on 5.3.0-SNAPSHOT

Reproduced on 5.4.0-SNAPSHOT

I believe the issue is derived by the writes not being thread-safe. seqNum are incremented in a non-thread-safe manner during writes, which can yield interesting values and potential gaps in values.

this change should fix that portion of the problem (where there are multiple input threads)

+++ b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb
@@ -340,6 +340,7 @@ module LogStash; module Util

     class WriteClient
       def initialize(queue)
+        @mut = Mutex.new
         @queue = queue
       end

@@ -351,7 +352,7 @@ module LogStash; module Util
         if @queue.closed?
           raise QueueClosedError.new("Attempted to write an event to a closed AckedQueue")
         end
-        @queue.push(event)
+        @mut.synchronize { @queue.push(event) }
       end
       alias_method(:<<, :push)

I am not quiet sure about how this problem manifests itself when dealing with one input worker, but having a split filter.

I'm going to test this patch shortly. Thanks @talevy!

@jordansissel coool. I'm not necessarily proposing we use this as a solution (if it is one), but it demonstrates that this portion of the write method: https://github.com/elastic/logstash/blob/9f5506db078a17c584a86c02009759b45ead59c3/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java#L297-L302

should also be placed under a lock, just as the bottom portion is. Since two threads can increment seqNum with nextSeqNum and the second could enter the mutex before the first, resulting in the first acked seqNum to not be in agreement with the constructor's initial minSeqNum of 0. This is just the first thread-safety issue I see, during experimental runs, I've also run into this after the first write.

I have a test case that reproduces this -- https://gist.github.com/jordansissel/518667ab4bd2df9d41244427917c9160

Next step is for checking places we should be locking.

@talevy I think the WrappedAckedQueue should carry the lock since the WriteClient is created by queue.write_client which creates a new WriteClient for every invocation. I'm going to try locking the push calls on the parent queue to see if that helps first.

diff --git a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb    
index b733d22..f81e88b 100644                                                                                                   
--- a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb                                                                    
+++ b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb                                                                    
@@ -40,6 +40,7 @@ module LogStash; module Util                                                                         
       @queue = queue                                                                                                   
       @queue.open                                                                                                              
       @closed = Concurrent::AtomicBoolean.new(false)                                                                           
+      @mutex = Mutex.new                                                                                                       
       self                                                                                                                     
     end                                                                                                                        

@@ -53,7 +54,9 @@ module LogStash; module Util                                                                         
     # @param [Object] Object to add to the queue                                                                      
     def push(obj)                                                                                                      
       check_closed("write")                                                                                                    
-      @queue.write(obj)                                                                                                        
+      @mutex.synchronize do                                                                                                    
+        @queue.write(obj)                                                                                                      
+      end                                                                                                             
     end                                                                                                                
     alias_method(:<<, :push)                                                                                                   

This patch seems to help.

^^ Patch does not help. With 5 writers and 1 reader, this happens:

# Watch the checkpoints/pages for stats:
% while true; do echo "------"; ruby -rpp -e 'Dir.glob(ARGV[0] + "/checkpoint.*").sort_by { |x| x[/[0-9]+$/].to_i}.each { |checkpoint| data = File.read(checkpoint); version, page, firstUnackedPage, firstUnackedSeq, minSeq, elementCount, crc32 = data.unpack("nNNQ>Q>NN"); puts File.basename(checkpoint); pagepath = ARGV[0] + "/page.#{page}"; pagestat = File.stat(pagepath);  using = pagestat.blocks * 512; puts "Page: #{pagepath} (allocated #{pagestat.size} bytes, using #{using} #{using * 100 / pagestat.size}%)"; p(version: version, page: page, firstUnackedPage: firstUnackedPage, firstUnackedSeq: firstUnackedSeq, minSeq: minSeq, elementCount: elementCount, crc32: crc32) }' $(ls -dt /tmp/studtmp-* | head -1); sleep 1; done;


Page: /tmp/studtmp-e807288856092cd4673cf45b43227516cd1b372009317f2ab006540e80bf//page.21 (allocated 256000 bytes, using 229376 89%)
{:version=>1, :page=>21, :firstUnackedPage=>13, :firstUnackedSeq=>50001, :minSeq=>47986, :elementCount=>2015, :crc32=>225857278}
checkpoint.13
Page: /tmp/studtmp-e807288856092cd4673cf45b43227516cd1b372009317f2ab006540e80bf//page.13 (allocated 256000 bytes, using 258048 100%)
{:version=>1, :page=>13, :firstUnackedPage=>0, :firstUnackedSeq=>31991, :minSeq=>29706, :elementCount=>2285, :crc32=>2049478859}
checkpoint.14
-e:1:in `stat': No such file or directory @ rb_file_s_stat - /tmp/studtmp-e807288856092cd4673cf45b43227516cd1b372009317f2ab006540e80bf//page.14 (Errno::ENOENT)
    from -e:1:in `block in <main>'
    from -e:1:in `each'
    from -e:1:in `<main>'

Notable:

  • Sometimes a page is removed but its checkpoint is not removed.
  • Sometimes a page is left around even though it should have been acked already.

For checkpoint 13, above,, it's interesting.

  • First unacked: 31991
  • min seq: 29706
  • element count: 2285

The interesting part here is that first unacked - min seq == element count. 31991 - 29706 = 2285. This means this page is fully acked, and yet it remains on disk.

@guyboertje Could I get your help on this?

@jordansissel
I think that the mutex in the ReadClient should also be used for the WriteClient.

my suggestion above does not work because when the queue is full a writer is blocked on the Java notify while it has the mutex meaning that a reader will never get the mutex to unblock the writer. deadlock.

Per @talevy comment - I agree that

long seqNum = nextSeqNum();

https://github.com/elastic/logstash/blob/3494f1e68c599a06118c7f0010c3b90e28ce3861/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java#L297

in the Queue.write() method needs to be inside the mutex lock. The Queue seqNum is not volatile and this can lead to seqNum race conditions with concurrent writers.

This is definitely a potential bug.

But otherwise the rest of the code prior to the mutex lock can stay where it is https://github.com/elastic/logstash/blob/3494f1e68c599a06118c7f0010c3b90e28ce3861/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java#L298-L302

Also, there is no need to add a mutex in the WriteClient, as long as the Queue.write method is properly thread safe (and it is not per the bug in my previous comment). All that WriteClient#push does it defers ultimately to Queue.write() so as long as the latter is thread safe, no mutex is necessary on the Ruby side WriteClient.

@colinsurprenant I agree. I mention it in my above comments to demonstrate that behavior becomes more consistent when one does so, which means write is not thread safe.

@jordansissel about your comment that

  • Sometimes a page is removed but its checkpoint is not removed.

This can be a normal behaviour - it's non obvious code but in https://github.com/elastic/logstash/blob/3494f1e68c599a06118c7f0010c3b90e28ce3861/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java#L577-L588 we will delete a page data file if it is fully acked but only delete the checkpoint if it is the first one. We decided to do this to make it easier on the recovery part where we can thus expect to always find/read contiguous checkpoints.

In other words, if a "middle" page is fully acked, its data file will be deleted but its checkpoint will stay on disk as long as it does not become the first checkpoint file or it becomes part of a sequence of "empty" checkpoints starting from the first page.

See this gist
With PR 6901 applied we still have problems.

Everything should now be fixed in #6901

Awaiting final review on #6901. Also note that @guyboertje spec has been updated to https://gist.github.com/colinsurprenant/b3ca535857a0e5d4ab48580fdfd653fe and all tests are passing.
We will merge in 5.4 and for 5.3.1 and we should look at back porting in 5.2 and 5.1

Yes it is, confirmed ++.

fixed by #6901 - will be part of upcoming 5.3.1 and 5.4.

Was this page helpful?
0 / 5 - 0 ratings