Clickhouse: Data get lost on heavy load via KafkaEngine

Created on 20 Mar 2019  路  29Comments  路  Source: ClickHouse/ClickHouse

We have created Yandex tank scenario in order to test the database on heavy load.
We use KafkaEngine to populate the database. The problem is that the database starts loosing data at 100rps.

How to reproduce

  • Create kafka group named _labeling__test_shard_localhost_
  • Create kafka topic named _labeling__all_
  • Create the database schema defined in DB_SCHEMA.txt
  • Install Yandex tank
  • Use tank python script and configution files attached to this issue
  • Run tank via the following command:
    _sudo docker run -v $(pwd):/var/loadtest -v $SSH_AUTH_SOCK:/ssh-agent -e SSH_AUTH_SOCK=/ssh-agent --net host -it direvius/yandex-tank -c ch_kafka_conf.yaml_

Expected behavior
The tank executes 100 requests, 30000 records each. So you should end up having 3000000 records via select count(), but you'll get less due to data loss.
To make sure that Kafka works as expected we used standard Kafka console consumer and it consumes exactly 3000000 messages.

Error message and/or stacktrace
There is nothing in server log file that corresponds to this issue.

Additional context
We do use the following versions of SW:

  • Kafka 2.11
  • Clickhouse 19.3.4 and the latest

DB_SCHEMA.txt
ya_tank.zip

bug comp-kafka st-need-info v19.3

Most helpful comment

https://gist.github.com/k0st1an/7767a7584751a3cca8e9511b86e54465

Since version 19.3.3 a problem has appeared:

In Clickhouse v19.4.3.11:
953770

In Clickhouse v19.4.2.7:
996993

In Clickhouse v19.4.1.3:
955197

In Clickhouse v19.4.0.49:
981035

In Clickhouse v19.4.0:
974905

In Clickhouse v19.3.9:
977697

In Clickhouse v19.3.8:
979060

In Clickhouse v19.3.7:
996120

In Clickhouse v19.3.6:
934678

In Clickhouse v19.3.5:
962754

In Clickhouse v19.3.4:
945722

In Clickhouse v19.3.3:
983144

In Clickhouse v19.1.9:
1000000

All 29 comments

I thought @den-crane helped yu to find the root of the problem in telegram chat?
As I understand the reason of "missing" rows was wrong order by columns in VersionedCollapsingMergeTree? Or it was not the only problem?

VersionedCollapsingMergeTree is a very specific table engine. Do you have a reason to use it?
Does the issue persist if you use MergeTree?

I thought @den-crane helped yu to find the root of the problem in telegram chat?
As I understand the reason of "missing" rows was wrong order by columns in VersionedCollapsingMergeTree? Or it was not the only problem?

probably you've confused two different cases. It's another issue.
And probably it's not related to VersionedCollapsingMergeTree (though I am not sure, because I don't really know how yaTank generates unique ORDER BY (gtin, batch, kiz__sign
I advised to try TSV. @aegorov05 agreed to try.

VersionedCollapsingMergeTree is a very specific table engine. Do you have a reason to use it?
Does the issue persist if you use MergeTree?

Changing engine to standard MergeTree does not make any difference. Instead of 3 million records we only got 2941414.

I thought @den-crane helped yu to find the root of the problem in telegram chat?
As I understand the reason of "missing" rows was wrong order by columns in VersionedCollapsingMergeTree? Or it was not the only problem?

probably you've confused two different cases. It's another issue.
And probably it's not related to VersionedCollapsingMergeTree (though I am not sure, because I don't really know how yaTank generates unique ORDER BY (gtin, batch, kiz__sign
I advised to try TSV. @aegorov05 agreed to try.

We have also tried using TSV format instead of JSONEachRow. Same result.
TSV Test.zip

VersionedCollapsingMergeTree is a very specific table engine. Do you have a reason to use it?
Does the issue persist if you use MergeTree?

Changing engine to standard MergeTree does not make any difference. Instead of 3 million records we only got 2941414.

So the difference is 2941414 - 3000000 = 58586 and it's smaller that 65535 (by default kafka_max_block_size = max_block_size = 65536). May be your last block with messages was not flushed yet? Can you recheck the result after 10 seconds of inactivity (by default stream_flush_interval_ms = 7500 ms)?

I think kafka engine waits for {kafka_max_block_size} rows with blocking read and after that flushes by reaching max_insert_block_size or 7500ms.

@aegorov05 try to write (3000 * 65536 + 1) = 196608001 rows

VersionedCollapsingMergeTree is a very specific table engine. Do you have a reason to use it?
Does the issue persist if you use MergeTree?

Changing engine to standard MergeTree does not make any difference. Instead of 3 million records we only got 2941414.

So the difference is 2941414 - 3000000 = 58586 and it's smaller that 65535 (by default kafka_max_block_size = max_block_size = 65536). May be your last block with messages was not flushed yet? Can you recheck the result after 10 seconds of inactivity (by default stream_flush_interval_ms = 7500 ms)?

I checked result after 10 second and more time. Count is not changed.

@aegorov05 try to write (3000 * 65536 + 1) = 196608001 rows

Why so many? :) Any multiple of 65536 would be ok, so for example 100 requests, 32768 messages each. It would be 3276800 messages in the table which is 50 blocks of size 65536.

I checked result after 10 second and more time. Count is not changed.

Somebody was reporting some issues with flushing by timeout. Need to check the code. Please ensure that after you fill last block to the required block size whole data appears.

I checked result after 10 second and more time. Count is not changed.

Somebody was reporting some issues with flushing by timeout. Need to check the code. Please ensure that after you fill last block to the required block size whole data appears.

I used default kafka-consumer-groups for check current offset. Offset is committed, LAG = 0.

@abyss7 Is there any estimate/milestone when this bug is going to be fixed?

I had similar problems: a small amount of messages was lost.
It turned out that the messages were recorded in Kafka with incorrect timestamp of messages, instead of milliseconds, seconds were written.
Maybe you also write seconds to timestamps?

To check this, use kafkacat:
> kafkacat -b broker:9092 -t topic -o end -c1 -f 'timestamp: %T, message: %s'

I had similar problems: a small amount of messages was lost.
It turned out that the messages were recorded in Kafka with incorrect timestamp of messages, instead of milliseconds, seconds were written.
Maybe you also write seconds to timestamps?

No, we write ms to timestamp
But I think CH Kafka Consumer shouldn't rely on message timestamps, because there could be a lot of messages with the same timestamp (ms or even ns) on heavy load.

@abyss7 Is there any estimate/milestone when this bug is going to be fixed?

I will investigate this next week - no estimation on the fix for now.

We ran into the same issue and managed to temporarily resolve it by reducing the kafka_max_block_size to 1. Could it be that the kafka wrapper is failing to successfully poll messages using a larger batch size but those failed messages are not being retried, leading to the data loss?

We have also encountered this issue. Some messages are silently lost without any logs. We have tried setting kafka_max_block_size to 1, but the speed of loading data from Kafka drops below this solution being viable.

I think at this state, this issue prevents using of Kafka engine in production environment.

https://gist.github.com/k0st1an/7767a7584751a3cca8e9511b86e54465

Since version 19.3.3 a problem has appeared:

In Clickhouse v19.4.3.11:
953770

In Clickhouse v19.4.2.7:
996993

In Clickhouse v19.4.1.3:
955197

In Clickhouse v19.4.0.49:
981035

In Clickhouse v19.4.0:
974905

In Clickhouse v19.3.9:
977697

In Clickhouse v19.3.8:
979060

In Clickhouse v19.3.7:
996120

In Clickhouse v19.3.6:
934678

In Clickhouse v19.3.5:
962754

In Clickhouse v19.3.4:
945722

In Clickhouse v19.3.3:
983144

In Clickhouse v19.1.9:
1000000

https://gist.github.com/k0st1an/7767a7584751a3cca8e9511b86e54465

Since version 19.3.3 a problem has appeared:

In Clickhouse v19.4.3.11:
953770

In Clickhouse v19.4.2.7:
996993

In Clickhouse v19.4.1.3:
955197

In Clickhouse v19.4.0.49:
981035

In Clickhouse v19.4.0:
974905

In Clickhouse v19.3.9:
977697

In Clickhouse v19.3.8:
979060

In Clickhouse v19.3.7:
996120

In Clickhouse v19.3.6:
934678

In Clickhouse v19.3.5:
962754

In Clickhouse v19.3.4:
945722

In Clickhouse v19.3.3:
983144

In Clickhouse v19.1.9:
1000000

I discovered that about 1 row per million got lost in Clickhouse v19.1.9. I sent into Kafka 3 000 000, but got 2 999 997 in Clickhouse.

@alexm93 you may be right, but I couldn't beat it:

In Clickhouse v19.5.2.6:
3916937

In Clickhouse v19.4.3.11:
3898375

In Clickhouse v19.4.2.7:
3878460

In Clickhouse v19.4.1.3:
3880372

In Clickhouse v19.4.0.49:
3886615

In Clickhouse v19.4.0:
3907914

In Clickhouse v19.3.9:
3910847

In Clickhouse v19.3.8:
3873083

In Clickhouse v19.3.7:
3871377

In Clickhouse v19.3.6:
3855757

In Clickhouse v19.3.5:
3947924

In Clickhouse v19.3.4:
3877515

In Clickhouse v19.3.3:
3839619

In Clickhouse v19.1.9:
4000000

In Clickhouse v19.1.8:
4000000

In Clickhouse v19.1.7:
4000000

In Clickhouse v19.1.5:
4000000

In Clickhouse v18.16.1:
4000000

In Clickhouse v18.16.0:
4000000

In Clickhouse v18.14.19:
4000000

In Clickhouse v18.14.18:
4000000

In Clickhouse v18.14.17:
4000000

In Clickhouse v18.14.15:
4000000

In Clickhouse v18.14.14:
4000000

In Clickhouse v18.14.13:
4000000

In Clickhouse v18.14.12:
4000000

In Clickhouse v18.14.11:
4000000

In Clickhouse v18.14.10:
4000000

In Clickhouse v18.14.9:
4000000

The test is awesome! Adding that all builds on 19.1 branch (incl 19.1.16.79) is working correctly.

can anyone confirm that the problem is fixed in 19.7? My test shows that data is still being lost in 19.7

can anyone confirm that the problem is fixed in 19.7? My test shows that data is still being lost in 19.7

Can you please provide the setup of your testing stand? I want to know the scenario to reproduce it.

Setup is very simple.
I had a large topic in kafka with jsons.
I have created 4 identical tables in clickhouse (1 standalone server) with corresponding kafka tables and mat views that feed from mentioned kafka topic.
After consumption finished I observed gaps in data inserted into those 4 tables in random places. (-XXX) means difference from largest value for this time slot.
Note that some data misses exactly 10000 messages which I believe should be some internal block size.

toStartOfInterval(event_time, INTERVAL 2 hour)    browser_clicks.data1    browser_clicks.data2    browser_clicks.data3    browser_clicks.data4
------------------------------------------------  ----------------------  ----------------------  ----------------------  ----------------------
2019-05-27 11:00:00                               16721458                16721458                16721458                16721458
2019-05-27 13:00:00                               44402325                44392325 (-10000)       44392325 (-10000)       44402325
2019-05-27 15:00:00                               49636872                49636872                49636872                49636872
2019-05-27 17:00:00                               35820136 (-14117)       35834253                35834253                35824253 (-10000)
2019-05-27 19:00:00                               43301844                43294694 (-7150)        43297684 (-4160)        43301844
2019-05-27 21:00:00                               39652534 (-4482)        39654166 (-2850)        39657016                39657016
2019-05-27 23:00:00                               16238829                16238829                16238829                16238829
2019-05-28 01:00:00                               4989495                 4989495                 4989495                 4989495
2019-05-28 03:00:00                               2643050                 2643050                 2643050                 2643050
2019-05-28 05:00:00                               6543845                 6543845                 6543845                 6543845
2019-05-28 07:00:00                               30924932 (-1401)        30926333                30923877 (-2456)        30926333
2019-05-28 09:00:00                               49297574                49292770 (-4804)        49287877 (-9697)        49289918 (-7656)
2019-05-28 11:00:00                               39687344 (-581)         39683783 (-4142)        39687925                39687925
2019-05-28 13:00:00                               43923416 (-7768)        43931184                43929293 (-1891)        43929894 (-1290)
2019-05-28 15:00:00                               48901709                48897484 (-4225)        48899138 (-2571)        48901709
2019-05-28 17:00:00                               34616002                34610704 (-5298)        34612315 (-3687)        34616002
2019-05-28 19:00:00                               41605188 (-1979)        41603572 (-3595)        41607167                41599980 (-7187)
2019-05-28 21:00:00                               34555756 (-4430884)     38939622 (-47018)       38986640                38939883 (-46757)
2019-05-28 23:00:00                               14378553 (-2052894)     16426578 (-4869)        16431447                16431092 (-355)

First attempt to reproduce was unsuccessful - no data lost.

After consumption finished
How did you make sure that consumption is finished on all tables? Can you provide tail (~1000 lines) of the logs to the moment when you do a check on number of messages?
Can you try the same scenario with kafka setting kafka_commit_every_batch=0 ?
Also, how many rows do you have per message?

Closing for now, since there were no other reports.

I also noticed that kafka engine lost data approximately 1%. If create another kafka engine and read data from beginning offsets again all data is written to table untli lag is not disappeared. Once lag is disapeared kafka engine lost data again.
Error log contains many
2019.08.29 13:20:27.778190 [ 33 ] {} StorageKafka (queue_debug): Re-joining claimed consumer after failure
2019.08.29 13:20:27.778950 [ 33 ] {} void DB::StorageKafka::streamThread(): std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed ou
t, version = 19.13.2.19

@borismol can you please check it on 19.11.9 (just released, it has important Kafka fixes)?

@filimonov i reproduced issue with 19.13.2.19

@filimonov i reproduced issue with 19.13.2.19

It means nothing.

The only stable release having latest Kafka fixes is 19.11.9. It is pull requests #6805 and #6597 (merged 3 days ago).

You can also pick latest 19.14 version, for example v19.14.1.1274-prestable, or wait few days till Kafka fixes will be ported to 19.13 too.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

hatarist picture hatarist  路  3Comments

healiseu picture healiseu  路  3Comments

jimmykuo picture jimmykuo  路  3Comments

bseng picture bseng  路  3Comments

innerr picture innerr  路  3Comments