Logstash: Concurrency Issue in Queue (multiple consumers, one producer) <= Performance + Bug

Created on 8 Jun 2017  路  24Comments  路  Source: elastic/logstash

Ran into this when trying to add buffering to the FileChannel IO approach.

If I run 2 consumers against one producer in the way shown in https://github.com/elastic/logstash/pull/7380 one of the consumers eventually locks up.

The reason for this is the following:

Assuming we're starting from an empty queue both consumers will block the last line of the snippet below:

    public Batch readBatch(int limit) throws IOException {
        Page p;

        lock.lock();
        try {
            while ((p = firstUnreadPage()) == null && !isClosed()) {
                try {
                    notEmpty.await();

Now this blocking is eventually unlocked by

 public long write(Queueable element) throws IOException

doing this:

            // if the queue was empty before write, signal non emptiness
            // a simple signal and not signalAll is necessary here since writing a single element
            // can only really enable a single thread to read a batch
            if (wasEmpty) { notEmpty.signal(); }

This is the only place in the Queue code that signals this lock. This means:

  • If a consumer calls readBatch() and neither drains the queue nor fires a new readBatch(), the Queue becomes locked since all other consumers continue to wait for the signal (which is never fired by the write since the wasEmpty is always false). => bug(?) reproduced in #7380
  • The performance of multiple consumers in general is suboptimal, __as soon as you start to fall behind all consumption happens from a single consumer only__.
bug

All 24 comments

@jsvd FYI :)

Interesting. But the notEmpty.await(); only happens when the queue is first empty firstUnreadPage()) == null.
But if 2 readers are waiting on notEmpty.await();, when a write occurs (that can only write a single element) which signals a reader, the only thing that can happen at this time since we use a single mutex, is that the signalled reader will read that new element and the queue will return to a empty state. Not sure I see where this is not correct. If there is a problem it might be with the empty state management?

@colinsurprenant

If there is a problem it might be with the empty state management?

This I think. The problem starts when 2 readers start waiting on an empty state. Only one of them gets to read then. If that one never drains the queue, you get stuck here:

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode):

"pool-2-thread-2" #14 prio=5 os_prio=31 tid=0x00007fe5609bf000 nid=0x5b03 waiting on condition [0x000070000ca6a000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076eb00df0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at org.logstash.ackedqueue.Queue.readBatch(Queue.java:450)
    at org.logstash.ackedqueue.QueueTest.lambda$queueStableUnderStress$6(QueueTest.java:578)
    at org.logstash.ackedqueue.QueueTest$$Lambda$3/1392906938.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

"pool-2-thread-1" #13 prio=5 os_prio=31 tid=0x00007fe55f8e7800 nid=0x5903 waiting on condition [0x000070000c967000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076eb23c38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

"Service Thread" #10 daemon prio=9 os_prio=31 tid=0x00007fe55d8ab000 nid=0x5503 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread3" #9 daemon prio=9 os_prio=31 tid=0x00007fe55f817800 nid=0x5303 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread2" #8 daemon prio=9 os_prio=31 tid=0x00007fe55f817000 nid=0x5103 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #7 daemon prio=9 os_prio=31 tid=0x00007fe55d868000 nid=0x4f03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #6 daemon prio=9 os_prio=31 tid=0x00007fe55d867000 nid=0x4d03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Monitor Ctrl-Break" #5 daemon prio=5 os_prio=31 tid=0x00007fe55f800000 nid=0x4b03 runnable [0x000070000c252000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    - locked <0x000000076eb166b8> (a java.io.InputStreamReader)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.io.BufferedReader.readLine(BufferedReader.java:324)
    - locked <0x000000076eb166b8> (a java.io.InputStreamReader)
    at java.io.BufferedReader.readLine(BufferedReader.java:389)
    at com.intellij.rt.execution.application.AppMainV2$1.run(AppMainV2.java:64)

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fe55d83c800 nid=0x4903 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fe55d818000 nid=0x3903 in Object.wait() [0x000070000c04c000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x000000076eb18470> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
    - locked <0x000000076eb18470> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fe55c802800 nid=0x3703 in Object.wait() [0x000070000bf49000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x000000076eb28178> (a java.lang.ref.Reference$Lock)
    at java.lang.Object.wait(Object.java:502)
    at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
    - locked <0x000000076eb28178> (a java.lang.ref.Reference$Lock)
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=31 tid=0x00007fe55f801800 nid=0x1c03 waiting on condition [0x000070000b52a000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076eb00e38> (a java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
    at java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at org.logstash.ackedqueue.QueueTest.lambda$queueStableUnderStress$7(QueueTest.java:602)
    at org.logstash.ackedqueue.QueueTest$$Lambda$4/793315160.apply(Unknown Source)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:479)
    at org.logstash.ackedqueue.QueueTest.queueStableUnderStress(QueueTest.java:606)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

"VM Thread" os_prio=31 tid=0x00007fe55d815800 nid=0x3503 runnable 

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fe55f003800 nid=0x2503 runnable 

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fe55f004800 nid=0x2703 runnable 

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fe55f005000 nid=0x2903 runnable 

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fe55f005800 nid=0x2b03 runnable 

"GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007fe55f006000 nid=0x2d03 runnable 

"GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007fe55f007000 nid=0x2f03 runnable 

"GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007fe55f007800 nid=0x3103 runnable 

"GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007fe55f008000 nid=0x3303 runnable 

"VM Periodic Task Thread" os_prio=31 tid=0x00007fe55d807000 nid=0x5703 waiting on condition 

JNI global references: 256

Heap
 PSYoungGen      total 76288K, used 60619K [0x000000076ab00000, 0x0000000770000000, 0x00000007c0000000)
  eden space 65536K, 81% used [0x000000076ab00000,0x000000076df52820,0x000000076eb00000)
  from space 10752K, 65% used [0x000000076eb00000,0x000000076f1e0770,0x000000076f580000)
  to   space 10752K, 0% used [0x000000076f580000,0x000000076f580000,0x0000000770000000)
 ParOldGen       total 175104K, used 16K [0x00000006c0000000, 0x00000006cab00000, 0x000000076ab00000)
  object space 175104K, 0% used [0x00000006c0000000,0x00000006c0004000,0x00000006cab00000)
 Metaspace       used 11327K, capacity 11560K, committed 11776K, reserved 1058816K
  class space    used 1389K, capacity 1477K, committed 1536K, reserved 1048576K

because there is no logical way for the notEmpty.signal to ever be called for a second time to wake another reader it up.

In steps:

  1. 2 Readers start and run into the notEmpty.await();
  2. Writer starts and wakes up one of the readers. The second one remains at the barrier and cannot be woken up unless the queue is drained again by reader 1 and wasEmpty becomes positive
  3. Reader 1 stops for whatever reason while there are still remainig elements in the queue
  4. Stuck because you don't get the signal to wake up Reader 2

makes sense right?

but at step (2) since only 1 element can be written at a time, the queue will necessarily be drained again by the first waked up reader no? unless there is a condition where this is not true (ie the following _readPageBatch(p, limit) will not drain in which case that is the bug or wrong assumption) the only other possibility is that we don't correctly maintain the empty state which is firstUnreadPage()) == null (did not review yet)

From a higher level perspective, if the consumers are faster than the producers, it is normal that all consumers are waiting and only one will be wakeup in turn, exactly when the next write occurs, all others continue waiting.

@colinsurprenant but the writer can write more elements than the consumer can handle after the first write triggers Reader 1. When is Reader 2 ever woken up in this scenario?

Since there is a single global mutex, only one read or write can happen
A- after the first write that triggers reader1, when reader1 is finished, queue is empty again, lock is released.
B- at that point reader2 is still waiting
C- the only thing that can happen here is that a second write occurs (well other readers could stack up but not relevant here), which will wakeup reader2 and the same sequence repeats.

I am not sure what you mean "the writer can write more elements than the consumer can handle" ??

Eventually if the consumers get more busy and all the writes have woken all the waiting readers and no readers are waiting, they will not re-issue a read so fast that producers will have time to fill the queue at which point the queue is not in an empty state anyone.

@colinsurprenant so concurrent reads are not supposed to be threadsafe on Queue and the test is invalid?

Not sure what you mean. The Queue is designed to be threadsafe and support concurrent reads. I did not look into the details of your tests (yet). Only discussing this particular code logic. I am not saying there is no problem (re-read my comments, I specifically said that if there was a problem it could be at an invalid _readPageBatch(p, limit) assumption or an invalid empty state handling). But I specifically do not see a logic problem in the notEmpty signal handling. Not seeing a logic problem does not necessarily mean that there isn't one, just that you have not yet demonstrated that there is one.

@colinsurprenant this is clearly a case of Reader 2 never being released, since using

            if (wasEmpty) { notEmpty.signalAll(); }

instead of

     if (wasEmpty) { notEmpty.signal(); }

in the write method fixes the issue. To me it is obvious that we see the sequence:

Reader 1 blocks on Condition
Reader 2 blocks on Condition
Writer writes one event to unblock Reader 1
Reader one takes a few ms to get back to reading, meanwhile writer writes some events
Reader 2 is blocked forever, even if Reader 1 stops (so long as it stops on a non-empty Queue)

where am I wrong here?

@original-brownbear when you way 芦Reader one takes a few ms to get back to reading, meanwhile writer writes some events禄

"writer writes some events" cannot (shoud not) happen: only one write can issue one write or one element which should unblock Reader 2.

IF that does not happen, WHY is that, this is the question

@colinsurprenant but how is it guaranteed that write and read happen in perfect w-r-w-r-w-r order in 100% of cases when they originate from different threads?

If feel we might be starting to get somewhere with this question. there is no guarantee that any read or write happen in order, obviously.

But when a write happen, it should systematically call if (wasEmpty) { notEmpty.signal(); } which will wakeup the next reader waiting. So in other words, a write in an empty queue should be necessarily be followed by a read because of the notEmpty.signal(). Looking at the code logic that should be always be the case but if we prove that it does not happen then we need to figure out why is that but so far nothing shows that the notEmpty.await() and notEmpty.signal() logic is wrong at a conceptual level. So because of this we cannot just accept notEmpty.signalAll() as the solution because we do not even correctly understand where the problem is exactly.

@colinsurprenant the stacktrace showing the second reader blocking on notEmpty.await() proves that it is not being unblocked doesn't it?

no sure? if the queue locks off completely it is a problem but having readers blocked when readers are faster than writers is normal no?

@colinsurprenant it locks completely and indefinitely (also tried higher timeouts in the test) with data left in the Queue in the test.

ok, then we still have work to do to understand why this is!

@colinsurprenant I worked the steps out above:

in the write method fixes the issue. To me it is obvious that we see the sequence:
Reader 1 blocks on Condition
Reader 2 blocks on Condition
Writer writes one event to unblock Reader 1
Reader one takes a few ms to get back to reading, meanwhile writer writes some events
Reader 2 is blocked forever, even if Reader 1 stops (so long as it stops on a non-empty Queue)

where am I wrong here?

Same question again, how are those steps wrong? I stepped through them with the debugger even ...

it is wrong in "writer writes some events". it can only be "writer writes one event" and that write should issue a notEmpty.signal() which should unblock Reader 2.

@colinsurprenant no there is no guarantee that the second write triggers Reader 2.
If Reader 1 is already trying to acquire the lock again on its .lock call (note that it has the time write take to unlock to get back to the lock), it has the exact same chance that Reader 2 has to acquire the lock in its await call! <= this is the heart of the issue.

  • We have no fair locking here.

All it takes is for two writes to go through subsequently on an empty queue because whatever the Reader did after it relinquished the lock took a few ns longer than usual from GC or whatever and the Queue is locked up.

@original-brownbear I think this is starting to look like a plausible explanation. Let me restate your hypothesis:

A- reader 1 waits on notEmpty
B- reader 2 waits on notEmpty
C- writer acquires lock, write element and signals notEmpty, lock is unlocked
D- a reader is selected to be waked up and tries to acquire the lock
E- a writer is "simultaneously" trying to acquire the lock and win in (F)
F- writer acquires lock, write element and does not signals notEmpty because queue is not empty at this point
G- readers are blocked.

Bingo, that makes sense. Lets see what the proper fix is now.

trivial fix as it turns out, see PR #7380 :)

perfect, thanks for your work on this :thumbsup: Happy we did not blindly go with a signalAll() solution, proper investigative work pays off.

oups I hit "Close and comment" - will let the merge perform the "official" close :P

@colinsurprenant all merged, thanks!

Was this page helpful?
0 / 5 - 0 ratings