We have a table with this structure:
id VARCHAR, Timestamp DateTime, v0 Nullable(DOUBLE), v1 Nullable(DOUBLE),...
The key is (id,Timestamp).
We receive our data v0,v1,... in several parts so that inserted rows are not complete, but we want the final result to be complete rows. For example:
Inserted:
id Timestamp v0 v1 v2 v3
================================
id1 ts1 1 1 null null
id1 ts1 null null 2 2
Expected result:
id Timestamp v0 v1 v2 v3
================================
id1 ts1 1 1 2 2
We also want to be able to update values, so we added a 'version' column. The rule we want to implement is "keep the latest non-null value for each column".
We tried different solutions but none is really perfect:
The ideal solution for us would be to have a "ignoreNullValues" option in ReplacingMergeTree so that instead of just keeping the latest row, the engine replaces a value only if the new value is not null (for each column).
Is this something that could be added?
I am working on a patch of SummingMergeTree to solve our use case described above. The idea is to specify a custom aggregation function (here it would be 'anyLast') which will be used instead of 'sumWithOverflow'.
I have a working prototype but there are some issues:
So my question is: is it a good approach to add these options to the SummingMergeTree?
An alternative would be to start a new MergeTree variant, but is there a chance that it is accepted in official sources?
You could try also AggregatingMergeTree with anyLastIf function.
```sql
create table qwerty
engine=AggregatingMergeTree
order by (id)
partition by tuple()
as select 1 as id, anyLastStateIf(toNullable(''),0) str, anyLastStateIf(toNullable(100),0) num;
insert into qwerty select 1 as id, anyLastStateIf(toNullable('abc'),0) str, anyLastStateIf(toNullable(200),1) num;
-- can work as materialized view
insert into qwerty select 1 as id, anyLastStateIf(str,str is not NULL), anyLastStateIf(num, num is not null) from ... ;
select id, anyLastMerge(str), anyLastMerge(num) from qwerty group by id;
````
yes, we already tried this solution but we were disappointed by performances. I guess it is because we used a view containing the anyLastMerge and enable_optimize_predicate_expression was disabled at this time. So as soon as we queried the view with criteria, it performed badly.
I try to produce some benchmark results, including compression ratio, but my general impression was that the AggregatingMergeTree was too heavy for this kind of very simple aggregation.
AggregatingMergeTree was too heavy for this kind of very simple aggregation.
I was comparing once SummingMergeTree with AggregatingMergeTree and did not found any significant differences in performance, but it wasn't accurate and proper benchmark.
Also stored state for anyLast function = just a value (without any extras).
I try to produce some benchmark results, including compression ratio
That would be great!
The possibility to specify skip_nulls for ReplacingMergeTree looks fine from user perspective (even if cosidering also skip_empty_arrays option) except that current implementation always replaces a row as a whole, not single values. Probably the implementation will be more complicated.
The possibility to specify custom aggregate functions for SummingMergeTree is more complicated from the user perspective. How it will be specified in table definition? Maybe we have to invent custom syntax to specify aggregate functions per columns?
I performed a benchmark with different storage solutions:
I measure the insertion time, the storage size and the time taken by a few requests chosen based on our use case. The table has 100 numeric columns Nullable(Float64).
It would be too long to get into all the details of the benchmark, but here are my main conclusions:
1) insertion time
ReplacingMergeTree suffers a lot from the need to reload old values, as expected. The AggregatingMergeTree is 70% slower than the SummingMergeTree.
2) storage size
Storage size is more or less equivalent between 3 solutions.
3) request time
The use of FINAL on ReplacingMergeTree and SummingMergeTree is very costly, so we decided to perform our requests without FINAL and deal with the approximate results. It works quite well because usually the last inserted data don't remain unmerged very long. We also use OPTIMIZE to force this merge after some inactivity time.
Requests on AggregatingMergeTree are between 40% and 3.5x slower than the 2 others, depending on the criteria.
BTW I noticed that enable_optimize_predicate_expression doesn't push a LIMIT criteria to the underlying table. This is very annoying because we get "SELECT * FROM data LIMIT 1000" requests coming from our integration with Tableau. Tableau sends this request when the user wants to preview a new datasource. And thus, usually it ends with a "memory limit exceeded" or a timeout Clickhouse side.
To sum up, in our use case, an engine which merges blocks with the anyLast function would be the optimal solution.
I see 2 ways to implement this:
the easiest is what I did: add an option to the SummingMergeTree (for example SummingMergeTree('anyLast') ). This defines a function which replaces the default 'sumWithOverflow', but it looks like a little hackish and is not very flexible.
a new engine, which could be called SimpleAggregatingMergeTree for example, with column definitions like this: SimpleAggregateFunction(anyLast, Nullable(String)). These columns store Nullable(String) values and the aggregate function is used when blocks are merged. To be confirmed that it is feasible.
What do you think of these solutions? Does it look like something that could become an official feature?
I've just started discovering Clickhouse sources and I'm not a C++ expert, but I could try working on it.
@bgranvea When you SELECT from AggregatingMergeTree (in contrast to ReplacingMergeTree), you usually do additional aggregation with -Merge aggregate functions.
As the alternative, you can try to select partially aggregated values without final aggregation by using finalizeAggregation function. This is ordinary function, that takes aggregation state as an argument and returns finalized result.
I have to make more benchmarks but performance with finalizeAggregation looks interesting.
I see a couple of drawbacks with this approach however:
1) NULL values seem to be lost and replaced by a "default" value (0 or empty string). Is this a bug/limitation of finalizeAggregation that could be improved? or is it the way the AggregatingMergeTree stores the state?
2) SELECT performances seems quite good but INSERT is more than 2x slower than inserting in a SummingMergeTree for example.
3) we manage tables with dynamic structures, this approach makes this process more complex as we need to manage 3 tables (Null engine, Materialized view and final view).
To mitigate 2) and 3), wouldn't it be possible to use the existing AggregatingMergeTree with a new function SimpleAggregationFunction so that stored state is the value itself with its real type? This could be used only in a few limited cases like any, anyLast, min, max.
Just to be sure that anyLast is the right answer to our needs: if I have an AggregatingMergeTree like this:
CREATE MATERIALIZED VIEW data_aggr ENGINE=AggregatingMergeTree PARTITION BY toStartOfQuarter(ts) ORDER BY (ts,id) AS SELECT ts,id,anyLastState(v0) v0,anyLastState(v1) v1,anyLastState(p0) p0,anyLastState(p1) p1 FROM data_null GROUP BY ts,id
Can I be sure when blocks are merged that rows with same (ts,id) are taken in their natural insertion order, so that anyLast really selects the last inserted row?
I suppose this cannot be guarantee when merging results of distributed requests but is it the case locally on each node?
Can I be sure when blocks are merged that rows with same (ts,id) are taken in their natural insertion order, so that anyLast really selects the last inserted row?
Yes, it is guaranteed for background merges. Data parts are selected for merge in contiguous intervals of insertion order and merge sorting merges identical keys in their insertion order.
CollapsingMergeTree and ReplacingMergeTree rely on that order.
However, the order is not guaranteed for SELECTs (unless you use FINAL).
To mitigate 2) and 3), wouldn't it be possible to use the existing AggregatingMergeTree with a new function SimpleAggregationFunction so that stored state is the value itself with its real type? This could be used only in a few limited cases like any, anyLast, min, max.
It is a very reasonable solution - allow to store states of simple aggregate functions as ordinary columns, just as in SummingMergeTree.
(but it will break backwards compatibility of existing AggregatingMergeTree tables if we will do it transparently)
It is a very reasonable solution - allow to store states of simple aggregate functions as ordinary columns, just as in SummingMergeTree.
(but it will break backwards compatibility of existing AggregatingMergeTree tables if we will do it transparently)
that's why I propose a new function SimpleAggregateFunction to implement this new behavior.
Otherwise it could be an option of the AggregatingMergeTree so that this optimization is done transparently only when possible.
Do you mean a new data type "SimpleAggregateFunction"?
Just to chime in on the subject, we have a similar problem that could be improved through this functionality. Running conditional queries on rapidly mutating rows is kind of a hassle at the moment. As far as I understand there are a few options (consider a ReplacingMergeTree with id as pk):
1 - Use HAVING - SELECT id FROM table GROUP BY id HAVING anyLast(col) = 'something'
2 - Use FINAL - SELECT id FROM table FINAL WHERE col = 'something'
3 - Use a materialized AggregatingMergeTree view with anyLastState(col) as col - SELECT id FROM mat_view WHERE finalizeAggregation(col) = 'something'
None of which feels natural; 1 has horrible performance, 2 kind of throws CH's potential awesomeness out the window, 3 is difficult to maintain with occasional column alters, not to mention omission of NULL values as @bgranvea stated. If something like SimpleAggregateFunction data type could provide direct insert and select interaction with the user, that'd be great. An entirely different engine that focuses on abstracting latest value operations from the user would equally be useful.
Do you mean a new data type "SimpleAggregateFunction"?
yes, sorry I was not very clear. A new data type very similar to AggregateFunction.
I have a first working version:
create table data_test (ts DATETIME,id VARCHAR, v0 SimpleAggregateFunction(anyLast,Nullable(Float64)), v1 SimpleAggregateFunction(sum,Nullable(Float64)), v2 SimpleAggregateFunction(min,Nullable(Float64)), v3 SimpleAggregateFunction(max,Nullable(Float64)), p0 SimpleAggregateFunction(anyLast,Nullable(VARCHAR))) ENGINE=AggregatingMergeTree PARTITION BY toStartOfQuarter(ts) ORDER BY (ts,id);
insert into data_test values('2019-01-01 00:00:00','id1',null,null,null,null,null);
insert into data_test values('2019-01-01 00:00:00','id2',1,1,1,1,'str1');
insert into data_test values('2019-01-01 00:00:00','id2',3,3,3,3,'str3');
select * from data_test;
βββββββββββββββββββtsββ¬βidβββ¬βββv0ββ¬βββv1ββ¬βββv2ββ¬βββv3ββ¬βp0ββββ
β 2019-01-01 00:00:00 β id1 β α΄Ία΅α΄Έα΄Έ β α΄Ία΅α΄Έα΄Έ β α΄Ία΅α΄Έα΄Έ β α΄Ία΅α΄Έα΄Έ β α΄Ία΅α΄Έα΄Έ β
β 2019-01-01 00:00:00 β id2 β 1 β 1 β 1 β 1 β str1 β
β 2019-01-01 00:00:00 β id2 β 3 β 3 β 3 β 3 β str3 β
βββββββββββββββββββββββ΄ββββββ΄βββββββ΄βββββββ΄βββββββ΄βββββββ΄βββββββ
select * from data_test final;
βββββββββββββββββββtsββ¬βidβββ¬βββv0ββ¬βββv1ββ¬βββv2ββ¬βββv3ββ¬βp0ββββ
β 2019-01-01 00:00:00 β id1 β α΄Ία΅α΄Έα΄Έ β α΄Ία΅α΄Έα΄Έ β α΄Ία΅α΄Έα΄Έ β α΄Ία΅α΄Έα΄Έ β α΄Ία΅α΄Έα΄Έ β
β 2019-01-01 00:00:00 β id2 β 3 β 4 β 1 β 3 β str3 β
βββββββββββββββββββββββ΄ββββββ΄βββββββ΄βββββββ΄βββββββ΄βββββββ΄βββββββ
I'm not sure if the implementation is right although. Here is what I did:
added a DataTypeSimpleAggregateFunction which takes the return type of the aggregate function (for example Nullable(Float64)) and delegates all the methods to the corresponding DataType. This allows to select/insert directly with the underlying data type.
modified AggregatingSortedBlockInputStream so that it recognizes columns SimpleAggregateFunction and manages these aggregations in the same way as SummingSortedBlockInputStream does.
there are different places where I need to unwrap the nested type (for example SimpleAggregateFunction(Nullable(Float64)) => Nullable(Float64) ). It is very similar to what is done for LowCardinality.
Is this the right approach?
I have difficulties when data types are recursively nested like for example SimpleAggregateFunction(Nullable(Float64)). You could also imagine define a column of type SimpleAggregateFunction(LowCardinality(Nullable(Float64))) (not tested yet).
There are multiple locations in the code where the data type is directly tested with typeid_cast<const DataTypeLowCardinality *> or typeid_cast<const DataTypeNullable *> which obviously doesn't work as I have first to unwrap the type nested in SimpleAggregateFunction.
Do you see a simple solution for this?
I am wondering if I could unwrap SimpleAggregateFunction at an early stage of the processing, so that the rest of the processing remains as it is today. For example in ExpressionAnalyzer or ExpressionActions?
I've noticed that a class IDataTypeDomain was introduced for IPv4 and IPv6 data types in order to override the name of a DataType (and also override serialize/unserialize functions but I'm not interested in that).
My idea is to use that so that a SimpleAggregateFunction(anyLast,Nullable(Float64)) column type is transformed into a DataTypeNullable with a custom IDataTypeDomain containing information about the aggregate function.
I know this was not the intended use but this seems the simplest way to ensure that my data type behaves like a standard one without modifying every place where there is a test for DataTypeNullable, DataTypeLowCardinality...
I don't see another way to attach extra information to existing data types as inheritance is forbidden and without adding new functions to IDataType.
Most helpful comment
I have a first working version:
I'm not sure if the implementation is right although. Here is what I did:
added a DataTypeSimpleAggregateFunction which takes the return type of the aggregate function (for example Nullable(Float64)) and delegates all the methods to the corresponding DataType. This allows to select/insert directly with the underlying data type.
modified AggregatingSortedBlockInputStream so that it recognizes columns SimpleAggregateFunction and manages these aggregations in the same way as SummingSortedBlockInputStream does.
there are different places where I need to unwrap the nested type (for example SimpleAggregateFunction(Nullable(Float64)) => Nullable(Float64) ). It is very similar to what is done for LowCardinality.
Is this the right approach?