Ksql: Being able to know how advanced the stream processing is

Created on 17 Aug 2018  ·  11Comments  ·  Source: confluentinc/ksql

Hi,

I searched in the docs but didn't find anything so I post here:

Assuming that I create a persistent STREAM named view_usersAndOrders that joins my users and orders topics. If those topics have 6 GB of data each it could take a moment for the KSQL cluster to process the whole data to put them in the Kafa topic view_usersAndOrders.

So if I launch my microservice consuming the view_usersAndOrders topic, how could it be able to know that it has received enough events to have approximately the "final" state of the view?

I mean I don't want to accept user requests onto this microservice until it's ready.

A first step could be to compare the offset of the microservice consumer and the offset of the last event in the view_usersAndOrders. Like that I know if my consumer has the most recent event in this topic _(we could consider the microservice as ready from the moment it has received an event timestamped less than 10 seconds, because if the topic receives 100 events per second it's possible that the consumer never reach the "latest" events within the topic)_

But that's not enough, because having the almost latest event present in the view_usersAndOrders topic doesn't mean that the KSQL cluster has finished to process the almost entirety of the users and orders topics. Example:

  • Microservice consumer has consumed 100% of the view_usersAndOrders
  • KSQL cluster has processed 25% of the JOINs between my users and orders topics
  • So my microservice consumer has in reality just consumed 25% of the data it needs

Since my JOIN could be about important data, I can't afford to consider my microservice as ready since it is not aware of the latest events not yet processed by KSQL cluster.

I guess there is a way to compare the offsets that my KSQL cluster has about what it consumes from users and orders topics. But I don't know where to access this kind of information... 😢

I hope that what I just wrote is understable 👍

Thank you,

enhancement

All 11 comments

Unfortunately at this time, there is no programmatic way to figure out how much of the inputs to a join have been processed.

Hi @apurvam! I will try to not repeat myself but after reading some Confluent’s articles advising to use an optimized view in each microservices I tried to make it working with KSQL as described there:
https://github.com/confluentinc/confluent-kafka-go/issues/181#issuecomment-413544557

The advancement of my KSQL processing is really important to set my microservice as ready. I guess according to your answer there is currently no other way to work around this...

But is it on your roadmap? And if it isn’t could you ask the Confluent team what would be the better way to make my “approach” working correctly with kind of “readiness” metrics?

@miguno said my approach seems good but if KSQL doesn’t support how long the stream has been processed I don’t understand what would you imagine?

Thank you for answering me and for all your amazing work 😎

Hi @apurvam 😃

Is there any plan to support this feature? It's totally impossible to set my microservices as ready without it since I can't guarantee my view topic generated by KSQL is almost sync to latest source data. I really think it's an important feature for KSQL.

Thank you,

I think from an engineering perspective, encoding the 'progress' of a join in a stream so that downstream consumers can make decisions (like whether they should serve traffic) based on this information is going to be a big investment and may even be impossible to do with 100% accuracy.

If we can collect more use cases, then we can gauge whether the ROI makes sense. cc @MichaelDrogalis @miguno may have more information here.

Having said that, KStreams and KSQL are open source, so if this is important enough, please feel free to explore ways to solve it for your use case.

@apurvam thank you for your answer 😃

I know it could be hard to be accurate, but for the use cases Confluent explains by using local view (generated by KSQL) in our microservices, it would be better to allow inbound traffic only when the view is almost updated.

So I don't expect having an exact information at a specific time, but something like having both of the following information for a specific KSQL request:

  • how many % the KSQL cluster has processed for each of the source topics "bound" to the KSQL request
  • what is the datetime of the latest event KSQL processed for this KSQL request

Like that I'm able on my side to make my own rules, to simply set my microservice as ready if:

  1. the latest event processed by KSQL for this KSQL request is dated less than 30 seconds
  2. OR in case the KSQL cluster receives too many events per second and that it's impossible for this one to process fast enough, I could check it has processed at minimum 80% of my source topics

I don't know exactly where to share this data, but it could be in a topic, with a frequency of 1 minute for example to do not flood the topic. But the compaction could also be enabled on it.

Another idea would be to expose it through the KSQL Server API without using topics, but I guess each KSQL instances of a cluster is not aware of its siblings and cannot share information between them?

As you said, the ROI has to make sense to implement it. Since by creating a view of my topics through KSQL and right after launching microservice there is a high risk of providing wrong (or old) information to users reaching this microservice... I think it's important enough no?

Maybe I missed something? Maybe other people using local view with KSQL processing already have a solution? I just try to understand how the pattern Confluent shows could fill the need of serving updated view.

Thank you,

I understand the problem, but as @apurvam mentioned, this is a non-trivial addition. One thing that's striking is that the notion of "progress" is primarily defined as application logic.

In the interest of making progress (no pun intended), can we back up for a moment and talk about the schemas over the two streams being joined? Is there a property of the events themselves that can signal staleness as part of a materialized view?

@MichaelDrogalis I understand it depends on the application logic, that's why I'm not talking about KSQL that sets a view as ready, but rather it exposes some "metrics" the microservice is able to deal with to make its own decision about the readiness.

That's still generic if KSQL exposes the source topics processing for its STREAM/TABLE. That's not intended for my specific use case (I think 😄 )

BTW, I'm not sure if the both last questions are for me but the RAWTIME value is inside events:
If we have 2 source topics (users and orders) and we would like to create a stream "orders_view", we could SELECT users.RAWTIME userEventTime, orders.RAWTIME orderEventTime and then on the microservice I could set some rules like:

  • for each event I receive, I choose the most recent RAWTIME between the userEventTime and orderEventTime
  • if I receive an event with the RAWTIME dated less than 10 seconds ago I can consider my microservice as ready
  • OR if I don't receive new events during 30 seconds, probably KSQL finished to populate the topic view and I can set my microservice as ready

But I see some drawbacks with this solution:

  • It's based on data pushed inside the topic view. But it may not reflect the reality (if something goes wrong on KSQL side)
  • I need to pollute the view topic with all the RAWTIMEs from my sources to make it working. It doesn't seem very user-friendly 😀 ... We could improve it a bit by concatenating all RAWTIMEs from sources inside a same field lastEventTime formatted with $AAA_RAWTIME:$BBB_RAWTIME:$CCC_RAWTIME:.... Like that I'm able to implement a generic way on my microservice whatever how many source the view results in.

I hope we will find a "proper" solution to handle the local view use case 🔨

I'll close this question as it hasn't been updated for almost a year. If you still have questions about this, feel free to re-open it or open a new issue.

Hey @spena, had a few months ago a call with @MichaelDrogalis about that. The issue described here has been understood even if Confluent is not working on it (correct me if I'm wrong).

I think that's needed to reopen it (I cannot on my side), it would bring real value to KSQL to manage event sourcing on sensible data.

Thank you,

Hey @spena - can confirm, I think this has advanced from question to feature. I understand what @sneko is asking about, and it's a legitimate need. It is hard to build an app against KSQL when the state stores are being rematerialized, since the stores appear to go backwards in time.

Got it. Thanks @MichaelDrogalis for re-opened it. I thought this was an old question.

Was this page helpful?
0 / 5 - 0 ratings