ksqlDB leaks state stores for transient queries

Created on 5 Oct 2020  路  6Comments  路  Source: confluentinc/ksql

On a long-running cluster, I noted that there are many state stores on disk for transient queries:

root@ksql-1:/mnt/data/data/ksql-state# ls -al | grep transient | grep -oE 'Sep [0-9]+' | sort | uniq -c
     76 Sep 23
     91 Sep 24
    100 Sep 25
     96 Sep 26
    102 Sep 27
     88 Sep 28
     89 Sep 29
     11 Sep 30
root@ksql-1:/mnt/data/data/ksql-state# ls -al | grep transient | grep -E 'Sep [0-9]+' | head -5
4096 Sep 29 16:29 _confluent-ksql-pksqlc-gn6rmtransient_1012238981171586952_1601396723718
4096 Sep 28 03:38 _confluent-ksql-pksqlc-gn6rmtransient_1012260790516886546_1601264083968
4096 Sep 26 13:13 _confluent-ksql-pksqlc-gn6rmtransient_101964711127807535_1601125759174
4096 Sep 24 17:48 _confluent-ksql-pksqlc-gn6rmtransient_1020128588393177207_1600969460743
4096 Sep 30 00:33 _confluent-ksql-pksqlc-gn6rmtransient_1025443061838213674_1601425747128

These state stores all appear to have the same size (mostly), so I suspect they're for failed queries:

root@ksql-1:/mnt/data/data/ksql-state# du -sh * | grep transient | grep -oE '^[0-9.]+K' | sort | uniq -c
      4 164K
    640 4.0K

Furthermore, I noticed that the Kafka cluster was also overwhelmed with too many partitions for internal topics that were not cleaned up - all of them belonged to the transient queries above. The ksqlDB server was degraded when these queries were initially issued.

P0 bug

All 6 comments

I had a look at our query-handling code and nothing obvious stood out to me that would explain this leak. We do have lots of logs around this codepath that would help, but they have been expired from the log store. So let's keep an eye out for a recurrence and debug when there are more logs.

Did a quick sanity-check here. It looks like we're leaking just the state store directory every time a transient query is terminated. The actual state store (directory contents) and internal topics are removed.

Looks like in some cases we _do_ leak internal topics and state stores. I think there's multiple issues going on here:

  1. On all terminates the state store directory for the query is leaked. Empty directories are cheap - they don't take up much space and modern filesystems (e.g. ext4) should be able to handle tons of entries without hurting performance. So the main problem here is that it pollutes the namespace of the state directory.

  2. If the transient query is rebalancing when it was closed, and the consumer does not detect that it's been closed before the close timeout, then the rebalance code will recreate the internal topics and state stores after ksql deletes them, and they will be leaked. This can be seen in the following log sequence for a query on the above cluster that leaked internal topics:

close during rebalance:

October 7th 2020, 03:45:40.508 | stream-client [_confluent-ksql-pksqlc-gn6rmtransient_5024434338661659353_1602042100594-f13115ab-bbdf-4a08-a865-87a64a0ef053] State transition from REBALANCING to PENDING_SHUTDOWN
October 7th 2020, 03:45:40.508 stream-client [_confluent-ksql-pksqlc-gn6rmtransient_5024434338661659353_1602042100594-f13115ab-bbdf-4a08-a865-87a64a0ef053] State transition from REBALANCING to PENDING_SHUTDOWN

consumer doesn't exit for a while

October 7th 2020, 03:50:40.328 | stream-client [_confluent-ksql-pksqlc-gn6rmtransient_5024434338661659353_1602042100594-f13115ab-bbdf-4a08-a865-87a64a0ef053] Streams client cannot stop completely within the timeout

At this point the close times out and returns back to ksql, which cleans up the internal topics/stores. But the rebalance callbacks recreate the internal topics (though I only see error messages for the topic creation, that may just be because success may not be logged. The fact that streams is trying to recreate the topic and the topic was leaked makes me think it succeeded).

October 7th 2020, 03:50:59.583 | stream-thread [ksql-workers-75] Could not create topic _confluent-ksql-pksqlc-gn6rmtransient_5024434338661659353_1602042100594-Join-repartition. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 500 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic '_confluent-ksql-pksqlc-gn6rmtransient_5024434338661659353_1602042100594-Join-repartition' already exists.

So here, we need to think about:

  • how can we prevent close exiting early when there's an ongoing rebalance? It might make sense to make the close timeout a function of the consumer timeouts. Maybe 2-3x the poll or request timeout? @guozhangwang do you think this makes sense?
  • whatever we do, there's always the possibility of these getting leaked - we might hit an exception in the close/cleanup path, or the node might just get bounced. We need some background process for cleaning these up.

Let's use this issue to track the first issue - tuning timeouts to try to avoid this problem.

https://github.com/confluentinc/ksql/issues/4009 already tracks general cleanup.

We can open a streams ticket for cleaning up the query state directories. (https://issues.apache.org/jira/browse/KAFKA-10585)

The situation is a bit different with processing-modes here:

  • at-least-once / exactly-once-beta: N producer, 2 * N consumer, 1 admin
  • exactly-once: M producers, 2 * N consumer, 1 admin

Where N == number of threads (in ksql, now it's 1), and M == number of assigned tasks.

And each closure of these clients is a blocking call, AND they are called sequentially. When we set the closing timeout, we should basically sum them up to be safe.

Also +1 on KAFKA-10585, that should be an easy fix.

@rodesai The timeout is configurable through ksql.streams.shutdown.timeout.ms. The proposal is to derive the timeout from the # of threads and internal closures, thus deprecating and eliminating the `ksql.streams.shutdown.timeout.ms config?

I took a look at the timeouts that Kafka streams is using per stream thread. These are:

  • a producer has close(Long.MAX_VALUE)
  • two consumers have close(30 seconds hard-coded)
  • an admin has close(Long.MAX_VALUE)

It is a little tricky to derive the timeouts because the Long.MAX_VALUE, and the hard-coded 30s timeout which could change in the future for a different value or a configuration. However, we could try to use the hard-coded values for now.

From the above internal closures, the producer should close immediately. Transient queries are not writing anything, and producers only wait on close when flushing writes to the topic. So we can ignore that timeout. The consumers are the ones that if rebalancing, then they will attempt to create the internal topics using the admin client. And the admin client does not re-create any topics by itself. So we can safely derive the query timeouts from the consumers only.

For that, I'll use the 2 * N * 30, where N is the number of stream threads. Each thread has 2 consumers, and each consumer waits up to 30s to close. It does no matter about the processing mode because the only change is the number of producers, which transient queries do not use.

i.e.
When using the default number of threads = 4, then we'll wait for 240s (4 minutes).

@rodesai @guozhangwang Does it sound good? Should I deprecate or remove the current query timeout configuration? or should I use that in case it is set?

Was this page helpful?
0 / 5 - 0 ratings