Describe the bug
I wanted to create a table which joined several rows into one collecting a single column into an array by using COLLECT_LIST. I also wanted that table to be keyed by a single column, but still have the other columns visible. So using a large GROUP BY on all the columns wasn't an option.
I figured the aggregate function LATEST_BY_OFFSET was the closest to what I wanted to do as it would return the latest column, even though they would always be the same. However, running the following statement:
CREATE TABLE ALBUMS AS
SELECT LATEST_BY_OFFSET(id) AS id, LATEST_BY_OFFSET(title) AS title, upc, LATEST_BY_OFFSET(release_date) AS release_date
COLLECT_LIST(artist) AS artists
FROM ALBUMS_SRC
GROUP BY upc;
And got the following error:
Non-aggregate SELECT expression(s) not part of GROUP BY: [LATEST_BY_OFFSET(ALBUMS_SRC.ID), LATEST_BY_OFFSET(ALBUMS_SRC.RELEASE_DATE), LATEST_BY_OFFSET(ALBUMS_SRC.TITLE)]
Ideally I'd be able to group by one column and use non-aggregated columns in the same query but I thought using an aggregated column in its place would work here, but it appears LATEST_BY_OFFSET isn't considered to produce an aggregated result in this case.
Using TOPK does work, however:
CREATE TABLE ALBUMS AS
SELECT TOPK(id, 1)[1] AS id, TOPK(title, 1)[1] AS title, upc, TOPK(release_date, 1)[1] AS release_date, \
COLLECT_LIST(artist) AS artists \
FROM ALBUMS_SRC \
GROUP BY upc
To Reproduce
Steps to reproduce the behavior, include:
Sample queries above.
Sample data (note multiple rows can exist for a single album representing each artist):
id,title,upc,release_date,artist
2245,Chinampa,191515612908,17396,"0|5151|El B煤ho|1|performer"
2515,Twilight Zone,3700604727675,18355,0|4248|Pumpkin|1|performer
2515,Twilight Zone,3700604727675,18355,"1|4249|Vin'S da Cuero|1|performer"
2508,"Mes anciens",3700604727606,18355,0|4923|Keusty|1|performer
2508,"Mes anciens",3700604727606,18355,"1|5481|Tom Dettinger|0|producer"
2508,"Mes anciens",3700604727606,18355,"2|5482|Noham Saad-Saoud|0|producer"
2508,"Mes anciens",3700604727606,18355,"3|5483|Chinhan Trieu|0|producer"
Expected behavior
I expect the table to be created without error.
Actual behaviour
I get the following error:
Non-aggregate SELECT expression(s) not part of GROUP BY: [LATEST_BY_OFFSET(ALBUMS_SRC.ID), LATEST_BY_OFFSET(ALBUMS_SRC.RELEASE_DATE), LATEST_BY_OFFSET(ALBUMS_SRC.TITLE)]
Tried in ksqdb 0.8.1 and, whilst the query successfully runs, the table is not populated and these errors appear in the logs:
ksqldb-server | [2020-04-20 11:41:20,665] ERROR stream-thread [_confluent-ksql-default_query_CTAS_MYSQL_ALBUMS_5-47a67775-f6c2-4deb-b9a1-9c202a1a0f31-StreamThread-3] task [1_2] Failed to flush state store Aggregate-Aggregate-Materialize: (org.apache.kafka.streams.processor.internals.ProcessorStateManager:287)
ksqldb-server | java.lang.NullPointerException
ksqldb-server | at io.confluent.ksql.function.udaf.latest.LatestByOffset$1.map(LatestByOffset.java:155)
ksqldb-server | at io.confluent.ksql.function.udaf.latest.LatestByOffset$1.map(LatestByOffset.java:125)
ksqldb-server | at io.confluent.ksql.function.UdafAggregateFunction.lambda$null$3(UdafAggregateFunction.java:174)
ksqldb-server | at io.confluent.ksql.function.UdafAggregateFunction.timed(UdafAggregateFunction.java:180)
ksqldb-server | at io.confluent.ksql.function.UdafAggregateFunction.lambda$getResultMapper$4(UdafAggregateFunction.java:174)
ksqldb-server | at io.confluent.ksql.execution.function.udaf.KudafAggregator$ResultTransformer.transform(KudafAggregator.java:135)
ksqldb-server | at io.confluent.ksql.execution.function.udaf.KudafAggregator$ResultTransformer.transform(KudafAggregator.java:112)
ksqldb-server | at io.confluent.ksql.execution.streams.transform.KsTransformer.transform(KsTransformer.java:53)
ksqldb-server | at io.confluent.ksql.execution.streams.transform.KsTransformer.transform(KsTransformer.java:36)
ksqldb-server | at org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:107)
ksqldb-server | at org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:81)
ksqldb-server | at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
ksqldb-server | at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
ksqldb-server | at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
ksqldb-server | at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
ksqldb-server | at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
ksqldb-server | at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
ksqldb-server | at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
ksqldb-server | at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)
ksqldb-server | at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
ksqldb-server | at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
ksqldb-server | at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
ksqldb-server | at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
ksqldb-server | at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
ksqldb-server | at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
ksqldb-server | at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
ksqldb-server | at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$flush$7(MeteredKeyValueStore.java:192)
ksqldb-server | at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
ksqldb-server | at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:192)
ksqldb-server | at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:282)
ksqldb-server | at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:554)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:490)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)
ksqldb-server | at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)
ksqldb-server | at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
are you saying that in 0.7.0 the query is rejected at compile time? That is because latest_by_offset only was introduced in 0.8.0. Also, we fixed this recently https://github.com/confluentinc/ksql/pull/4975, which will be released in 0.9.0, which I think will address your problem?
I can't say for sure, but I think this will probably fix the error, yes.
Sorry, I hadn't realised that the function didn't exist in 0.7.0. However, the error given doesn't state that the function doesn't exist, only that it should be in the GROUP BY part of the statement. This is what led to the confusion and lacks clarity. I guess there's a chance it could catch people again.
Non-aggregate SELECT expression(s) not part of GROUP BY: [LATEST_BY_OFFSET(ALBUMS_SRC.ID), LATEST_BY_OFFSET(ALBUMS_SRC.RELEASE_DATE), LATEST_BY_OFFSET(ALBUMS_SRC.TITLE)]
I'm running into the same error message when trying to use LATEST_BY_OFFSET() as part of a table. For reference we are running the official ksql docker image confluentinc/cp-ksql-server:5.4.1.
I'm running into the same error message when trying to use
LATEST_BY_OFFSET()as part of a table. For reference we are running the official ksql docker image confluentinc/cp-ksql-server:5.4.1.
5.4.1 does not include LATEST_BY_OFFSET.
Hey all. The issues here seem to be only pertaining to versions of KSQL that don't have LATEST_BY_OFFSET or maybe pertaining to using the function on a table source, (which isn't supported).
Hence I've raised a PR to ensure a more helpful error message is returned if the method doesn't exist. Merging that PR will close this issue. Please feel free to raise another issue if anyone feels there specific issue has not been addressed.
Hi @big-andy-coates, thanks for clarifying that this function doesn't work with tables; the documentation I read wasn't clear on this point.
There don't appear to be any other functions which would allow a table column (not in the GROUP BY) to reflect the most recent value only. For example: in an incoming stream of location data for each (keyed) device ID - to be able to reflect the last known device ID location.
Appreciate if you can point me to any Confluent documentation which might explain how to achieve this, because I haven't found anything to date.
@big-andy-coates @brightneuron I think my larger problem is LATEST_BY_OFFSET not being in the latest confluentinc/cp-ksql-server image, but it's also problematic if the function cannot be used within the create statement for a table. I saw no mention of that in the documentation. I have a somewhat similar use case to @brightneuron I believe, and to avoid adding the value I'm interested in to the group by clause I have to perform some aggregation on the field. LATEST_BY_OFFSET would solve that for me.
To be clear, the source I would be creating the table from is a stream.
LATEST_BY_OFFSET is available in 0.9 confluentinc/ksqldb-server image.
It can used in a CREATE TABLE statement _where at least one source is a stream_. The documentation does mention this, (though it's not super clear). See https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/aggregate-functions/#latest_by_offset, which states:
```
LATEST_BY_OFFSET(col1)
Stream
Return the latest value for a given column. Latest here is defined as the value in the partition with the greatest offset. Rows that have col1 set to null are ignored.
````
Notice the Stream on the line after the function name? That means it works on stream sources. Other functions may have Stream, Table.
The reason the function doesn't work on table sources is because table can have data retracted from them, requiring the derived table to _undo_ some previous update. This is not feasible for many aggregate functions, including LATEST_BY_OFFSET, as they would need to maintain a full history of all values ever seen.
There don't appear to be any other functions which would allow a table column (not in the GROUP BY) to reflect the most recent value only.
What does your source data look like? If each record contains the values you want then you may be able to just import it as a change log topic, i.e. via a CREATE TABLE statement, rather than a CREATE STREAM.
If the topic is not correctly keyed, you may need to first import as a stream, repartition and reimport the sink topic as a table.
Hi, I'm getting the same error using CREATE TABLE with the source being a stream
Example below, should this be giving an error (using 0.8.1)?
CREATE STREAM 600_customer_events_stream WITH (PARTITIONS=6) AS SELECT *
FROM customer_events_raw
WHERE tenant=600 AND customer is not NULL
EMIT CHANGES;
CREATE TABLE 600_customer_events_agg WITH (PARTITIONS=6) AS SELECT
customer AS customer_id
,LATEST_BY_OFFSET(
CASE
WHEN event = 'subscribe_to_notification' AND EXTRACTJSONFIELD(context,'$.category')='water' THEN 'True'
WHEN event = 'unsubscribe_to_notification' AND EXTRACTJSONFIELD(context,'$.category')='water' THEN 'False'
END
) AS water_reminder_subscription
FROM 600_customer_events_stream
WINDOW HOPPING (SIZE 2 DAYS, ADVANCE BY 1 DAY)
GROUP BY customer
EMIT CHANGES;
Hi Andy, my table is based on a stream of incoming flight position events ("incoming_flight_events"). Each event in this stream contains partial flight position data only, represented in columns such as longitude, latitude, ground speed, altitude, etc).
So my hope was to create a table that could summarise each detected flight (contact) with a combination of some aggregate data (e.g. no. of associated events) along with the last known location data.
My table statement looks something like this (example error-inducing statements "commented out"):
CREATE TABLE table_flightcontacts
WITH (
kafka_topic='table.flightcontacts',
value_format='AVRO',
TIMESTAMP='last_messageDateTime'
)
AS SELECT
aircraftID,
COUNT(*) AS messages,
# LATEST_BY_OFFSET(altitude) AS last_altitude,
# LATEST_BY_OFFSET(groundSpeed) AS last_groundSpeed,
MIN(messageGenDateTime) AS first_messageDateTime,
MAX(messageGenDateTime) AS last_messageDateTime
FROM incoming_flight_events
WINDOW SESSION (60 MINUTES)
GROUP BY aircraftID
EMIT CHANGES;
Unfortunately if I include the LATEST_BY_OFFSET parameters, the statement fails as above. This seems counter-intuitive, but I'm a relative ksqldb newbie.
Hi @big-andy-coates, shouldn't this issue be reopened?
Based on your comments above, my use case shouldn't be failing.
I'm running ksqldb v5.5.0
Never mind. FYI for others with the same confusion - I clarified via Community Slack that this UDAF is available on KSQL standalone 0.8.1, however this version isn't yet packaged into Confluent Platform 5.5.0. And unfortunately KSQL throws that unhelpful error message rather than advising the UDAF doesn't exist.
UDAF source is here: https://github.com/confluentinc/ksql/blob/master/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/latest/LatestByOffset.java
Hit the same problem. The documentation should clearly state in which version of KSQLDB each function was introduced. And it is NOT clear what version of KSQLDB is packed in Confluent Platform 5.5, either the docker images should be versioned according to source version, or there should be a clear way to retrieve version from the Confluent Control Center.
Spent half a day thinking what I have missed, eventually getting down to the simplest aggregation like
create stream test (id string (KEY), name string) with (KAFKA_TOPIC='test', VALUE_FORMAT='JSON', partitions = 2);
create table test_table as
select id, LATEST_BY_OFFSET(name)
from test
group by id;
just to find out that LATEST_BY_OFFSET is not included in Confluent Platform 5.5.
PLEASE, invest more time into documentation, TODOs in the docs are looking ridiculous.
LATEST_BY_OFFSET aggregate function works well in Confluent Plataform 5.5.1-2.12.
I tested this feature and it worked.