My customer has a set of Kafka topics sourced from a mixture of REST API processes and Oracle Golden Gate. These process can not readily be suspended (well they can, but it is not trivial through the number of moving parts). They also have some KSQL code processing and transforming data as it passes to another system. This uses multiple KSQL tables and streams.
When they need to modify the KSQL code they need to terminate the running queries, drop and replace the KSQL Streams and Tables (CTAS and CSAS).
The problem they have is that the create queries run from the current offset of the source topic and the data streamed between terminate and create is lost. They had hoped that as the KSQL topic names were the same that the previously read offset would be reused from the underlying topic metadata. However it seems that each new query uses its own uniquely named consumer group. The alternative of setting offset to earliest is not viable as this puts a large amount of already processed data on the output streams
Is there a way that we can associate a query to a specific start offset?
Here's a little bit more insight on this one. Currently, the consumer_group for the KSQL persistent query is ksql.service.id plus the queryId of the statement. The queryId is an auto-incrementing value based on the object_name with _N. Therefore, when a query is terminated and recreated (as part of a release, a deployment, etc.), the persistent query gets a brand new consumer group. Any messages that come into the topic between TERMINATE and CREATE... evaporate into the ether.
One approach is to set ksql.streams.auto.offset.reset to earliest, but that just means that events will be reprocessed, and we need logic for handling that. That simply ignores the main reason for using Kafka... offset management.
A proposed solution is to allow setting the Streams application_id as part of the KSQL (with...) clause. This is exactly how Streams applications work--using defined application ids--and should be workable here.
This KSQL behavior (Generating brand new consumer group for recreated persistent query) also have some other side effects.
During our development process we recreated the KSQL queries many times and it ended up creating lot of orphaned consumer groups and changelog topics. Can this be cleaned while terminate the query?, or should terminate command have option to delete topics?
We are building pipelines through KSQL to feed our analytics warehouse. Our policy is for minor schema version changes, only new fields can be added. It would be great if we could re-create the pipeline of streams, and not have to set offset to earliest, but, pass the offset to start with to the first stream in the pipeline in order to be able to re-create all our streams and pick right up where we left off, rather than having to start at the beginning...
@nbyrnes-acv Read this blog and see if you think it can help. If you have any questions, feel free to reach out.
I'm really surprised this issue is not rated higher. I must be missing something... are other people working around this somehow?
cc @agavra . Worth tracking this as part of the new work to support query upgrades. This could be a low hanging fruit we could tackle in a early milestone. The ability to resume could be something we can fix early in the process.
Yes - I believe there is someone already working on a KLIP for this (cc @eshepelyuk), but I totally agree this is something we should implement: #4622
This will be available (in some fashion, see documentation here: https://github.com/confluentinc/ksql/blob/master/docs/concepts/upgrades.md) in the next release (0.12.0)!
Most helpful comment
I'm really surprised this issue is not rated higher. I must be missing something... are other people working around this somehow?