I was curious if there's a link to a laymen's explanation of the specific steps for insertion works.
I read the architecture guide here: https://clickhouse.yandex/docs/en/development/architecture/
But it doesn't answer things like "after the data's inserted, when's it available to query?" I assume, for example, that the data isn't available until after it's been sorted and committed to disk.
I'm asking because we're observing data which is hours behind what we expect it to be, despite the consumer writing the most up-to-date data into the cluster successfully.
@abraithwaite there are different options on how to do insertion with various trade-offs. Major choice is whether to write to Distributed table or directly to shards (the latter is harder to implement, but works better), then there are settings like insert_quorum that allow to wait for data to be written on more than one replica.
How exactly do you do inserts at the moment?
We are doing inserts with the Go client from a kafka topic. We write in batches of 10-20k records which happens on a 1-2s period. The number of parts grew past parts_to_delay_insert and was hovering around 200 parts for the most recent partition of data.
After stopping our workers entirely, I noticed that inserts were still happening. This suggests that when writing to clickhouse with the Go client, it copies the data to a local file somewhere and the actual insert happens sometime afterwards. We won't see the data in the table until after that insert happens. Is that correct? Are all successful writes just a "it's been received by the server and is on disk" or is there a way to configure clickhouse to delay the write acknowledgement until it's been actually inserted into the table?
Reading the insert_quorum docs, what happens when the value is 1? We don't want to wait for at least 2 servers to ack the write, but we do want at least 1 to successfully insert it into the table before responding to the client.
@abraithwaite it sounds like you are inserting to table with Distributed engine, you can avoid the behaviour you describe in second paragraph by using it only for SELECTs and doing INSERTs directly to shards (tables with ReplicatedMergeTree engine).
Reading the insert_quorum docs, what happens when the value is 1?
It's the default, basically you have a low chance to lose some data (maybe forever) if that one replica dies immediately after responding "ok" to the client doing insert.
Gotcha. So for distributed writes, since the data can end up on any node, clickhouse buffers the data instead of streaming it to the ReplicatedMergeTree tables.
I'm guessing insert_quorum doesn't affect distributed table inserts as well? FWIW, the docs say the default for insert_quorum is 0 not 1. 1 makes sense, but given my problem I thought that maybe with 0, it copies the data locally first before being written.
Thanks @blinkov ! I think it would still be helpful for implementers to have a reference to know the exact timeline for inserts. At least for the MergeTree, ReplicatedMergeTree and Distributed storage types.
If we're not inserting into the distributed table, do we have to write to both replica's of the shard ourselves (since internal_replication is a distributed table setting)?
The distributed table docs make it sound like we will have to write to both replicas and the replicatedmergetree won't copy over the data itself, if you write to one of the replicas but not both.
https://clickhouse.yandex/docs/en/operations/table_engines/distributed/
A better question might be: How do we appropriately monitor distributed table insertions? As far as I can tell, there's no settings to adjust or metrics to watch to tune and monitor distributed table insertion lag.
We've setup our own (just does select max(dt) from global_raw_events and compare that value to the wall time), but it doesn't help us figure out what we need to bring it down. We don't have a ton of spare CPU cycles, but enough that I believe these inserts should be prioritized over merges.
Our active part count per partition is well under the limits (hovering around 40-50 for the most recent partition which is actively being inserted to), so we're at a loss for how to lower the delta between the client writing the data and clickhouse's insertion into the table.
Found the "DelayedInserts" metric which seems to be for distributed tables. That's helpful at least :-)
The distributed table docs make it sound like we will have to write to both replicas and the >replicatedmergetree won't copy over the data itself, if you write to one of the replicas but not both.
There are two different approaches
Each shard can have the 'internal_replication' parameter defined in the config file.
If this parameter is set to 'true', the write operation selects the first healthy replica and
writes data to it. Use this alternative if the Distributed table "looks at" replicated tables.
In other words, if the table where data will be written is going to replicate them itself.
In case of data load without Distributed you MUST insert into only one replica of Replicated table.
Questions were answered. Was just hard to find in the docs. Thanks @den-crane and @blinkov for the help!
Most helpful comment
There are two different approaches
In case of data load without Distributed you MUST insert into only one replica of Replicated table.