Related to #528.
Executing a SELECT on a table via the REST interface streams the results back. It would be useful to provide a single shot HTTP query of a table that simply did a distributed KTable query without any streaming. Right now this doesn't seem to be possible.
TL;DR: This feature request is primarily about adding functionality to KSQL so that it is possible to do a SELECT * FROM <table> with similar semantics as in the RDBMS world (MySQL, Postgres, Oracle, etc).
Today, however, KSQL only supports streaming queries, so a SELECT query against a TABLE will continue to run, and will also include the same key multiple times in the query output if there were subsequent changes to the key.
As already said via Slack: Thanks for reporting @joewood. :)
I assume what you mean by this is the following? Imagine you have a users TABLE with 10 records (= 10 unique users). In this case, you want a way to query the table's current data via e.g. SELECT * FROM users, which should return 10 output records and then terminate (rather than keeping the query running, waiting for changes to the table that will then be outputted, too).
@miguno yes, here's a couple of more suggestions (based on how we use the streams API now for distributed queries):
SELECT * FROM users where key="<value">. This would map to store(name).get( key )SELECT * FROM users where key BETWEEN "<start>" AND "<end>". Mapping to store(name).range(start,end)SELECT * FROM users OFFSET 10 ROWS FETCH NEXT 5 ROWS ONLY or with an implicit ROW_INDEX. Right now we do pagination by performing a range and limiting the iteration loop (which isn't ideal). (related to #528)The above would need to be executed distributed across the KSQL processes. The simple key based queries could be clever and direct the query to the owning node based on partition assignment (we don't do that now, but it's a future optimization).
Hi @joewood, can you share details about the use cases that essentially require KSQL tables to look like 'point in time snapshots' of your stream?
Is it to use KSQL with another system which expects the regular SQL syntax and semantics? Some details on the context of the problem you are trying to solve may suggest workarounds. At the very least it will give us some data which would help in prioritization.
This type of feature is a big investment, so more data points will definitely help moving it forward.
Thanks!
Hi @apurvam the use-case is to be able to access the state stores that KSQL is using. This allows external clients to view the state of the KSQL stores like a data repository. For example, I may have a KSQL statement that is used to create an aggregation of products based on orders placed. I may want a simple UI to show the current point in time aggregation by using a simple REST call (similar to the distributed queries in the Confluent Examples). The query could be simply by the store key (or range of store keys). I don't think a full SQL expression eval over every row is required (essentially query by key only).
I think the bigger question here is how KSQL is designed to be used. The current REST interface is an HTTP stream, which is fine - but it doesn't scale very well to multiple consumers. I don't see an easy to way to use KSQL programmatically. I may be missing something as I haven't checked back since December.
I think the bigger question here is how KSQL is designed to be used. The current REST interface is an HTTP stream, which is fine - but it doesn't scale very well to multiple consumers. I don't see an easy to way to use KSQL programmatically.
Not sure I follow. KSQL is a simpler way to do stream processing, ie. read from kafka topic(s) and write to kafka topic(s). The outputs in the kafka topics can be consumed in a variety of ways by multiple consumers, like dumping to an external store using kafka connect, feeding real time dashboards, etc.
For your example above, you could just drop the aggregates generated to an external store and use that to build your UI dashboard. There are already tools to facilitate this (like a variety of sink connectors). What advantage do you see of querying state stores directly?
Hi @apurvam, sure I could write a separate streams app that took the backing topic from an aggregate store in KSQL, re-populated the store and expose it over HTTP. That just feels like a lot of additional resources given that the store already exists in the KSQL processes.
What is the difference between a table and a stream if the current state of a table is not accessible?
Copying from the terminology documentation:
A table is a view of a stream, or another table, and represents a collection of evolving facts. For example, we could have a table that contains the latest financial information such as "Bob鈥檚 current account balance is $150". It is the equivalent of a traditional database table but enriched by streaming semantics such as windowing. Facts in a table are mutable, which means new facts can be inserted to the table, and existing facts can be updated or deleted. Tables can be created from a Kafka topic or derived from existing streams and tables. In both cases, a table's underlying data is durably stored (persisted) within a Kafka topic on the Kafka brokers.
I'm confused as to how that example would actually be implemented. How can I turn a stream of account balance updates into a table where I can retrieve a user's current balance?
My initial impression was that select on a table would return the "point in time" results as this issue is requesting. I stumbled upon this issue while trying to figure out what that isn't the case.
Thanks for your comment @instantdelay . Those are reasonable questions. I think there are two separate issues brought up in your post.
The notion of a table still makes sense even if we don't currently support point in time queries on these tables. For instance, we support joins between streams and tables today. For this operation, the definition of a table above makes sense: when a table is joined with a stream, the _latest_ value for a given key in the table is what is joined with the incoming message in the stream.
This is not to say that we _wont_ support point in time queries. But even without them the semantics of a table you laid out can still be used, just not through simple 'select' statements.
Hope this makes sense.
We would require point in time queries for our use case, too. We built several microservices which store thier data in global KTables. Currrently we use a workaround to query the global KTable for attributes of the stored data (e.g. SELECT * FROM users WHERE logged_in = true).
We too have a use case of point-in-time query support. Just checking if we have any ETA about when this will be supported..
Are there any updates on the point-in-time queries? Do you have an ETA?
No ETA yet @tobihofmann
We have a use cases for doing KSQL point-in-time queries for our application as well.
We also would like KSQL point-in-time queries. It is a much simpler solution than funneling the data to an external datastore, and allows the retrieval to remain in KSQL. I am building an API to allow people to run KSQL statements directly, and it would be much more intuitive to be able to simply run a select statement and be guaranteed receive all the data currently in a table at that time or have an error occur, as in a standard SQL database. The most useful aspect is knowing when all the data in the table has actually been read. Currently, there is no good way to guarantee that all table rows have been read from a KTable unless one sets a long timeout, which is definitely not something I want to do.
Does anyone know a good workaround to achieve point-in-time queries? Thanks!
I am looking for the same as well. Any updates on this?
On a side note, is something like this available in Streams API itself for Kafka?
I am looking for the same as well. Any updates on this?
On a side note, is something like this available in Streams API itself for Kafka?
"Interactive queries". But some pieces are missing.
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html
https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
I also have a use case in my application with Streams API. Wondering if Streams API provide support for same, also pagination and limit.
We're also looking for something like this. Any updates here?
This is being queued up to be worked on shortly. 鉂わ笍
This is being queued up to be worked on shortly. 鉂わ笍
made my day! any ETA yet?
@desixma :) Not yet, but we'll keep this issue posted with all the updates. All of the design work will happen on GitHub.
The design discussion for this is at #3117
This is finally available in ksqlDB 0.6.0! :)
Release: https://ksqldb.io/quickstart.html
Docs: https://docs.ksqldb.io/en/latest/concepts/queries/pull/
Most helpful comment
This is being queued up to be worked on shortly. 鉂わ笍