I saw many errors logged in the clickhouseserver.error.log
2019.11.06 01:16:20.394819 [ 48 ] {}
2019.11.06 01:25:13.131197 [ 43 ] {}
Could anybody help us to avoid these issues?
Thanks
Please, provide full traces, describe in details which messages format you use and output of SHOW CREATE TABLE for Kafka table.
Hi,
Thanks for your reply.
CREATE TABLE gateway_error (
ip String,
message String,
gateway String,
timestamp UInt64
) ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092',
kafka_topic_list = 'gateway_error',
kafka_group_name = 'ck_gateway_error_prod_v2',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1,
kafka_skip_broken_messages=1;
CREATE TABLE rp_gateway_error(
ip String,
message String,
gateway String,
timestamp UInt64
) ENGINE = MergeTree()
PARTITION BY gateway
ORDER BY (gateway, timestamp)
SETTINGS index_granularity = 8192;
CREATE MATERIALIZED VIEW consumer_gateway_error TO rp_gateway_error AS SELECT * FROM gateway_error;
The rp_gateway_error is receiving new messages, but left behind very much.
Here is log in clickhouseserver.err.log, every big topics found the same issues.
2019.11.06 02:10:49.373816 [ 44 ] {} <Error> void DB::StorageKafka::threadFunc(): std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 19.15.3.6 (official build)
2019.11.06 02:10:49.430406 [ 43 ] {} <Error> void DB::StorageKafka::threadFunc(): std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 19.15.3.6 (official build)
2019.11.06 02:11:34.426154 [ 55 ] {} <Error> StorageKafka (interstitial_load_failed): Consumer error: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
2019.11.06 02:14:50.845788 [ 51 ] {} <Error> void DB::StorageKafka::threadFunc(): std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 19.15.3.6 (official build)
2019.11.06 02:14:51.037743 [ 47 ] {} <Error> void DB::StorageKafka::threadFunc(): std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 19.15.3.6 (official build)
2019.11.06 02:15:49.118264 [ 41 ] {} <Error> StorageKafka (banner_loaded): Consumer error: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
2019.11.06 02:17:28.791548 [ 47 ] {} <Error> StorageKafka (video_requested): Consumer error: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
2019.11.06 02:17:35.036707 [ 50 ] {} <Error> void DB::StorageKafka::threadFunc(): std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 19.15.3.6 (official build)
2019.11.06 02:24:29.030264 [ 55 ] {} <Error> StorageKafka (banner_loaded): Consumer error: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
2019.11.06 02:24:30.565580 [ 57 ] {} <Error> StorageKafka (video_load_failed): Consumer error: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
2019.11.06 02:27:15.142199 [ 47 ] {} <Error> StorageKafka (event): Consumer error: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
2019.11.06 02:30:37.653232 [ 50 ] {} <Error> StorageKafka (banner_load_failed): Consumer error: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
2019.11.06 02:33:16.658829 [ 52 ] {} <Error> StorageKafka (video_requested): Consumer error: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
2019.11.06 02:35:02.886061 [ 50 ] {} <Error> StorageKafka (video_load_failed): Consumer error: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
2019.11.06 02:40:55.547771 [ 57 ] {} <Error> StorageKafka (video_load_failed): Consumer error: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
2019.11.06 02:45:00.371621 [ 57 ] {} <Error> StorageKafka (video_requested): Consumer error: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
2019.11.06 02:46:18.576508 [ 50 ] {} <Error> StorageKafka (banner_requested): Consumer error: Local: Maximum application poll interval (max.poll.interval.ms) exceeded
Thanks for your help.
I saw that error several times when there were more consumers than partitions in the Kafka topic.
Can you confirm that you have enough partitions for all the consumers?
You can check partitions and consumers assignment with kafka-consumer-groups command-line tool.
PS. Probably we can/should try to report that situation in a more intelligent way.
Hi,
Here is the kafka-consumer-group output, I checked and confirmed all kafka clickhouse table use
kafka_num_consumers = 1 settings
dingjianshengdeMacBook-Pro:bin ding$ ./kafka-consumer-groups.sh --bootstrap-server mykafkaserver:9092 --describe --group ck_event_prod_v3 --offsets
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
event 0 107898981 123735717 15836736 ClickHouse 19.15.3.6-64a65586-5e00-43e0-95b0-61e686c1fe6f /--.--.--.-- ClickHouse 19.15.3.6
In this situation, the lag become larger everyday. Any options to make clickhouse work harder to catch the kafka topics?
Thanks for your help.
According to your logs the issue is about too heavy message load - CH can't keep up with incoming messages between poll intervals, and consumers get dropped. Also they require time to reconnect again later. You can try to tune the stream_poll_timeout_ms setting to a greater value.
@abyssz Can you at least report table/topic that is relevant to the message for:
2019.11.06 01:16:20.394819 [ 48 ] {} void DB::StorageKafka::threadFunc(): std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 19.15.3.6 (official build)
Here is the kafka-consumer-group output, I checked and confirmed all kafka clickhouse table use
kafka_num_consumers = 1 settings
How many tables / servers do you have which try to read that topic? Is it really only one table on one server, as kafka-consumer-groups.sh output suggests? or you just have a single partition in your topic and try to consume that with multiple servers / tables?
Did you consider adding more partitions to your topic to make it possible to consume them concurrenlty?
Only 1 table /server to read that topic.
We have to move to sinker program to consume that big topics(3000 per sec now) to clickhouse and works now.
As we saw, Kafka engine only polls for 5 seconds and leave the consume group, that makes the lag bigger and bigger.
polls for 5 seconds and leave the consume group
it should not leave the consumer group. It explicitly exits the group in case of error (need to be checked what kind of error was that).
Kafka engine can consume about 10k-200k rows per second (depending on used format, size of the row and hardware).
According to your logs the issue is about too heavy message load - CH can't keep up with incoming messages between poll intervals, and consumers get dropped. Also they require time to reconnect again later. You can try to tune the
stream_poll_timeout_mssetting to a greater value.
Hi could you please be more specific about the following statements
Thank you for your time and effort.
P.S. We use CH in production and we are almost exhausted trying to get CH and Kafka work together.
if you have a message about max.poll.interval.ms exceeded that means that there was a too big delay between 2 consecutive consume calls.
I.e. either it takes too much time to flush the previously consumed data to the disk, either all the background threads from the pool were busy in the meanwhile (for example that can happen if you have a lot of Kafka tables, or setup big num_consumers for each of them).
ClickHouse currently uses quite simple error processing for Kafka. In case of every error it leaves the consumer groups, and getting live back can take some time.
We have several improvements scheduled for the next 2 months.
Can you make messages like this one more descriptive?
2019.11.29 05:05:47.154529 [ 37 ] {} <Error> void DB::StorageKafka::threadFunc(): std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 19.16.3.6
What this message tells me? What should I do if I see it?
Another question regarding setting kafka parameters as described on https://clickhouse.yandex/docs/en/operations/table_engines/kafka/
If I set parameter, how to check its current value?
For example, if I set parameter
<kafka>
<max_poll_interval_ms>20000</max_poll_interval_ms>
</kafka>
How to check whether it is set to 20000 or not?
Do I have to restart the clickhouse server or not?
How to be sure that parameter has been applied?
"max.poll.interval.ms" is most common error when dealing with kafka streams using processor api.
In other words long running message "handlers/punctuators" could trigger this. I am trying to say this is perfectly OK and logical for re-balancing purposes.
What I don't understand is kafka threading model in CH.
The defaults are as follows (5 minutes, this is pretty much high value.)
max.poll.interval.ms -> 300000 (Maximum allowed time between calls to consume messages)
(https://raw.githubusercontent.com/edenhill/librdkafka/master/CONFIGURATION.md)
Thank you for your time and effort.
Can you make messages like this one more descriptive?
Local: Timed out
I have problem noted already. I will see what we can do / how we can improve that.
If I set parameter, how to check its current value?
AFAIK there is no way to do that currently.
I will check if it's possible to show effective values. Will add that to my TODO (but it's already quite long, so not sure when will get to that).
Do I have to restart the clickhouse server or not?
It will not work 'on the fly', but I think it should be enough to detach / attach table.
You can also enable rdkafka logging to debug/understand what is actually going on. There is a sample in doc. For example <debug>all</debug>. It will write rdkafka log to /var/log/clickhouse-server/stdout file (or to docker logs if you use docker).
@filimonov Thanks for an answer. Can you please answer to https://github.com/ClickHouse/ClickHouse/issues/7646#issuecomment-571939568 as well?
Does CH have dedicated thread for the kafka processing or Thread Poll is used (common static thread pool)?
'Schedule' thread pool is used for that, the size of that is configured by background_schedule_pool_size setting.
You can check the actual usage of the pool by looking at system.metrics metric name BackgroundSchedulePoolTask and its memory usage in MemoryTrackingInBackgroundSchedulePool. That pool is shared for several different background tasks. Check the sources for details.
I'm also thinking of a better solution for that, but for now, I have no clear plan for that.
Buffer uses a separate thread for flushing.
max.poll.interval.ms -> 300000 (Maximum allowed time between calls to consume messages)
Yes, if you hit that error - it would be nice if you will explain the details of your setup/use case, non-default settings (if any). So I can try to create a test case from that & to reproduce the problem and try to fix/improve that.
Does CH have dedicated thread for the kafka processing or Thread Poll is used (common static thread pool)?
'Schedule' thread pool is used for that, the size of that is configured by
background_schedule_pool_sizesetting.You can check the actual usage of the pool by looking at system.metrics metric name
BackgroundSchedulePoolTaskand its memory usage inMemoryTrackingInBackgroundSchedulePool. That pool is shared for several different background tasks. Check the sources for details.
Actually we haven't managed to change this value, also we haven't found it in official doc.
Please could you suggest how to do it, in which file under which key to put this property.
config.xml looks like this at the end:
<yandex>
.....
<stream_flush_interval_ms>60000</stream_flush_interval_ms>
<max_insert_block_size>10485760</max_insert_block_size>
<max_threads>20</max_threads>
<max_block_size>65536</max_block_size>
<fsync_metadata>0</fsync_metadata>
<stream_poll_timeout_ms>30000</stream_poll_timeout_ms>
<kafka_max_wait_ms>20000</kafka_max_wait_ms>
<background_schedule_pool_size>25</background_schedule_pool_size>
<kafka>
<enable_auto_commit>true</enable_auto_commit>
<auto_commit_interval_ms>1000</auto_commit_interval_ms>
<max_poll_interval_ms>600000</max_poll_interval_ms>
</kafka>
</yandex>
I'm also thinking of a better solution for that, but for now, I have no clear plan for that.
Buffer uses a separate thread for flushing.
max.poll.interval.ms -> 300000 (Maximum allowed time between calls to consume messages)
The default value is 5 minutes and you have stated the following: " Buffer uses a separate thread for flushing."
So, what could cause 5 minutes delays between two consecutive poll requests.
Yes, if you hit that error - it would be nice if you will explain the details of your setup/use case, non-default settings (if any). So I can try to create a test case from that & to reproduce the problem and try to fix/improve that.
We have 10 nodes cluster (5 shards replication factor 1)
There are no significant "non-default" settings.
One important detail is related to periodic high I/0 waits.
The system is deployed on top of virtual machines (VmWare).
This is something that we cannot change unfortunately.
Per day up to 1 Billion of events.
Kafka broker : 2.3.1 stable
ClickHouse client version 19.16.3.6.
Connected to ClickHouse server version 19.16.3 revision 54427.
Really appreciate your help.
Thank you very much for your time.
Regards,
Ivan
@filimonov Can you please answer to @ivanprostran last question?
users.xml (select * from system.settings)
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<background_schedule_pool_size>25</background_schedule_pool_size>
also users
<max_insert_block_size>10485760</max_insert_block_size>
<max_threads>20</max_threads>
<max_block_size>65536</max_block_size>
<kafka_max_wait_ms>20000</kafka_max_wait_ms>
confix.xml
<stream_flush_interval_ms>60000</stream_flush_interval_ms>
<fsync_metadata>0</fsync_metadata>
<stream_poll_timeout_ms>30000</stream_poll_timeout_ms>
@ivanprostran did you check system.metrics BackgroundSchedulePoolTask ? Are you really close the the top?
Settings:
FROM system.settings
WHERE name = 'background_schedule_pool_size'
ββnameβββββββββββββββββββββββββββ¬βvalueββ¬βchangedββ¬βdescriptionβββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β background_schedule_pool_size β 16 β 0 β Number of threads performing background tasks for replicated tables. Only has meaning at server startup. β
βββββββββββββββββββββββββββββββββ΄ββββββββ΄ββββββββββ΄βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Current state (varies)
SELECT *
FROM system.metrics
WHERE metric = 'BackgroundSchedulePoolTask'
ββmetricββββββββββββββββββββββ¬βvalueββ¬βdescriptionβββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β BackgroundSchedulePoolTask β 13 β Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc. β
ββββββββββββββββββββββββββββββ΄ββββββββ΄βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1 rows in set. Elapsed: 0.002 sec.
ngasp01 :) select * from system.metrics where metric='BackgroundSchedulePoolTask'
SELECT *
FROM system.metrics
WHERE metric = 'BackgroundSchedulePoolTask'
ββmetricββββββββββββββββββββββ¬βvalueββ¬βdescriptionβββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β BackgroundSchedulePoolTask β 15 β Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc. β
ββββββββββββββββββββββββββββββ΄ββββββββ΄βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Thank you for your help and time.
users.xml (select * from system.settings)
<?xml version="1.0"?> <yandex> <profiles> <default> <background_schedule_pool_size>25</background_schedule_pool_size>also users
<max_insert_block_size>10485760</max_insert_block_size> <max_threads>20</max_threads> <max_block_size>65536</max_block_size> <kafka_max_wait_ms>20000</kafka_max_wait_ms>confix.xml
<stream_flush_interval_ms>60000</stream_flush_interval_ms> <fsync_metadata>0</fsync_metadata> <stream_poll_timeout_ms>30000</stream_poll_timeout_ms>
Finally we've managed to change the settings (both users.xml and config.xml)
Thank you very much !
'background_schedule_pool_size' has been increased to 50
SELECT *
FROM system.settings
WHERE name = 'background_schedule_pool_size'
ββnameβββββββββββββββββββββββββββ¬βvalueββ¬βchangedββ¬βdescriptionβββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β background_schedule_pool_size β 50 β 1 β Number of threads performing background tasks for replicated tables. Only has meaning at server startup. β
βββββββββββββββββββββββββββββββββ΄ββββββββ΄ββββββββββ΄βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
And now we have the following:
SELECT
metric,
value
FROM system.metrics
WHERE metric = 'BackgroundSchedulePoolTask'
ββmetricββββββββββββββββββββββ¬βvalueββ
β BackgroundSchedulePoolTask β 19 β
ββββββββββββββββββββββββββββββ΄ββββββββ
@ivanprostran how many Kafka tables you have? Did it solve the issue with timeouts?
Hi,
There are 5 tables.
Input topics have 10 partitions.
Each node consumes from a single partition (10 nodes 5+5 repl.factor 1)
Unfortunately the errors are still present (possible reduced a little bit..)
2020.01.16 15:23:27.393222 [ 69 ] {} <Error> void DB::StorageKafka::threadFunc(): std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 19.16.3.6
2020.01.17 02:56:42.891898 [ 53 ] {} <Error> void DB::StorageKafka::threadFunc(): std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 19.16.3.6
2020.01.17 08:52:28.771024 [ 47 ] {} <Error> StorageKafka (bitrate_change_kafka): Consumer error: Local: Broker transport failure
To be sure the new settings are as follows:
config.xml
<stream_poll_timeout_ms>30000</stream_poll_timeout_ms>
<kafka>
<enable_auto_commit>true</enable_auto_commit>
<auto_commit_interval_ms>1000</auto_commit_interval_ms>
<max_poll_interval_ms>600000</max_poll_interval_ms>
</kafka>
users.xml
<!-- Default settings. -->
<default>
<max_threads>50</max_threads>
<background_schedule_pool_size>50</background_schedule_pool_size>
<kafka_max_wait_ms>20000</kafka_max_wait_ms>
</default>
The assumption is that kafka errors (i.e. unstable clickhouse kafka consumers) cause duplicate
entries (we know the transactions are not supported yet)
This is why we introduced the following configuration, but we are not sure whether this helps or not.
<enable_auto_commit>true</enable_auto_commit>
<auto_commit_interval_ms>1000</auto_commit_interval_ms>
Thank you for your time and effort.
Regards,
Ivan