Ksql: Broadcast Joins via GlobalKTable

Created on 28 Feb 2018  Â·  9Comments  Â·  Source: confluentinc/ksql

Consider the case of joining a high-volume stream to a much smaller lookup table in ksql. The high-volume stream likely has many more partitions than the table but the data from all of them has to get squeezed into a small number of streams tasks to actually perform the join. This is exactly the kind of bottleneck for which the GlobalKTable was added to the Streams API.

This request is to allow a user to specify, probably via an oracle-style 'hint' in her query syntax, that she wishes a full copy of the lookup table to be 'broadcast' to each processing node, thereby allowing for as many concurrent streams threads to work on the join as there are partitions in the input stream.

Example syntax:

SELECT * FROM BIG-STREAM LEFT JOIN /* BROADCAST */ SMALL-TABLE ON ...

In an ideal future world the explicit hint would not be required and this optimization would be applied algorithmically but that has many interesting prerequisite steps...

enhancement popular

Most helpful comment

Not sure what would be a better syntax, but definitely support for GlobalKTable is needed in KSQL.

All 9 comments

Would it not make more sense to specify the _global_ part when creating the small-table, rather than when using the small table in another statement?

i.e. CREATE GLOBAL TABLE AS...

I think it's both (a) more flexible; and (b) far more sql-like to allow you
to specify at the point of use i.e. in the join. This way you get to make a
table like any other, not having to know ahead of time what kind of users
it will be put to - think of this just as a join-strategy hint to use in
this one particular case, not impacting the nature of the table itself

On Mar 1, 2018 4:09 AM, "Andy Coates" notifications@github.com wrote:

Would it not make more sense to specify the global part when creating
the small-table, rather than when using the small table in another
statement?

i.e. CREATE GLOBAL TABLE AS...

—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
https://github.com/confluentinc/ksql/issues/816#issuecomment-369572442,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AEV0oOn-tkc8aS-bxR-nqwe660phFjhvks5tZ-UCgaJpZM4SV6M3
.

I'd lean more towards defining it when you create the table. A global table maps to a replicated table in the data warehouse world. It seems to align nicely.
As a side note, the main benefit of a Global KTable is being able to do non-key joins. So whatever syntax is added would need to support this.

+1 @dguy

Not sure what would be a better syntax, but definitely support for GlobalKTable is needed in KSQL.

Hi, is there any progress on this feature ?

+1 .. This will really save lot of time.

+1 This could be super handy for me too. Given guidance on the intended resolution to the open syntax question I could try to work up a PR as well. Personally I think CREATE GLOBAL TABLE... seems the most natural.

I think that Kafka Streams would need to support proper time-synchronized replicated tables first. Note, that KTable and a GlobalKTable is not just about sharding vs replication, but also about time-synchronization vs no time-synchronization.

I also believe that sharding vs replication is an internal detail that should _not_ leak into the query language. Maybe something like an "optimizer hint" could be used, but I would not recommend to introduce a GLOBAL TABLE as first class citizen -- a replicated and sharded table is semantically the same and thus should both have the same representation in the language / data model.

Was this page helpful?
0 / 5 - 0 ratings