Elasticsearch: [ Request Reopen ] Java application using BulkProcessing hangs if elasticsearch hangs

Created on 7 Sep 2017  Â·  3Comments  Â·  Source: elastic/elasticsearch

Elasticsearch version (bin/elasticsearch --version):
Tested over all major 5.x versions [ 5.1.2 5.2.x 5.3.x ... ]

Plugins installed*: [ defaults ]

JVM version (java -version):
Java(TM) SE Runtime Environment (build 1.8.0_144-b01)

OS version (uname -a if on a Unix-like system): CentOS 6

Description of the problem including expected versus actual behavior: The issue faced is when using the Java API Transport client in client side applications. If ES Hangs or goes down, all bulk processor threads gets deadlocked.

This issue was already discussed here: https://discuss.elastic.co/t/java-application-using-bulkprocessing-hangs-if-elasticsearch-hangs/36960/8

We are facing this issue as a result of ES going down due to https://github.com/elastic/elasticsearch/issues/24359.

Can we as a feature implement a timed waiting semaphore as described here: https://discuss.elastic.co/t/java-application-using-bulkprocessing-hangs-if-elasticsearch-hangs/36960/2 and expose it as a param the value for semaphore release. ( Or ofcourse a better way of correcting this ).

Is there any workaround for this possible from the code outside of driver as a quick fix in case it can't be handled at driver level?

Steps to reproduce:

  1. ES is running and bulk insertion from application in progress.
  2. ES nodes restart
  3. Deadlock at application side.

Thread dump:

Looks like as follows:

"HistoryCachedExecutor-125" #429 daemon prio=5 os_prio=0 tid=0x00007fd268dea000 nid=0x8fe1 waiting on condition [0x00007fd0de2e3000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000744a1ca98> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
        at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

Any help in fix or a workaround for this will be really appreciated. Glad to provide any further info needed on this.

Most helpful comment

6.3.1 high rest client also had the same problem.Neither onResponse nor onFaiure called by high rest client bulk async request。

Thread dump Looks like as follows:

"elasticsearch[scheduler][T#1]" #81 daemon prio=5 os_prio=0 tid=0x00007f6b459eb000 nid=0x83 waiting on condition [0x00007f6a1c7fb000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000068d0594e8> (a java.util.concurrent.CountDownLatch$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
    at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
    at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
    at org.elasticsearch.action.bulk.BulkProcessor.access$300(BulkProcessor.java:51)
    at org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:373)
    - locked <0x00000006c01f83b8> (a org.elasticsearch.action.bulk.BulkProcessor)
    at org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:182)
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - <0x00000006c0041380> (a java.util.concurrent.ThreadPoolExecutor$Worker)

All 3 comments

+1

the issues here is that we don't have a timeout on the connection for bulk which I think it ok for bulk to not timeout. if you want to make sure your bulk requests do actually time out you can do that by adding a plugin to the TransportClient. The plugin must implement NetworkPlugin and there you can wrap an async sender that is basically an interceptor of the request before it gets executed. You can there take advantage of the TransportRequestOptions that allow you to specify a timeout conditionally for bulk. I know this is not perfect but given that we fade out transport client and the rest client has a global timeout I think we are generally ok. if you need help adding this workaround I am happy to help, but lets move it to the discuss forum.

6.3.1 high rest client also had the same problem.Neither onResponse nor onFaiure called by high rest client bulk async request。

Thread dump Looks like as follows:

"elasticsearch[scheduler][T#1]" #81 daemon prio=5 os_prio=0 tid=0x00007f6b459eb000 nid=0x83 waiting on condition [0x00007f6a1c7fb000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000068d0594e8> (a java.util.concurrent.CountDownLatch$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
    at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
    at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
    at org.elasticsearch.action.bulk.BulkProcessor.access$300(BulkProcessor.java:51)
    at org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:373)
    - locked <0x00000006c01f83b8> (a org.elasticsearch.action.bulk.BulkProcessor)
    at org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:182)
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - <0x00000006c0041380> (a java.util.concurrent.ThreadPoolExecutor$Worker)
Was this page helpful?
0 / 5 - 0 ratings

Related issues

clintongormley picture clintongormley  Â·  3Comments

brwe picture brwe  Â·  3Comments

martijnvg picture martijnvg  Â·  3Comments

clintongormley picture clintongormley  Â·  3Comments

dadoonet picture dadoonet  Â·  3Comments