Ksql: Fix PARTITION BY semantics

Created on 10 Mar 2020  路  11Comments  路  Source: confluentinc/ksql

Old style query semantics for partition by are:

S1: ROWKEY => B, C. (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).

CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;

S2: ROWKEY => B, C.

As you can see the schema of S2 is still the same. However, the old value of the key has been lost as its been overridden with the value from B.

With new primitive key work to remove the restriction on key column naming, the same query semantics do not work. e.g.

S1: A => B, C

CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B;

S2: B => B, C

This fails as the B value column clashes with the B key column! Hence we need to fix the query semantics to remove B from the value schema when doing the PARTITION BY.

It doesn't seem right to 'fix' the query semantics to remove B without also fixing them to add A to the value schema, i.e. the schema should be:

S2: B => A, C

All 11 comments

I would of liked to decouple fixing the query semantics from the primitive key work. However, it looks like some of the work will need to be done earlier than anticipated.

cc @derekjn @MichaelDrogalis

It seems reasonable to say that we need https://github.com/confluentinc/ksql/issues/3536 as a prerequisite to supporting JSON/PB/Avro key formats . But then we get the problem described above. And that means the scope becomes larger, as we get into changing query semantics, all the compatibility implications that brings, especially for hosted flavors of KSQL.

Would be good to brainstorm on how to thread this needle. Is it a reasonable step to drop the dependency on #3536 from the product point of view?

Derek and I connected offline to talk about it. We think it's smart to avoid accreting technical debt in this area since it鈥檚 incredibly fundamental to the project. If we need to enlarge the scope to do it right, we can get in the weeds and help.

Thanks. I think the main thing to decide upon is whether we think #3536 is a real blocker for supporting primitive keys serialized with PB/Avro/JSON. As @big-andy-coates noted in that ticket without allowing key names other than rowkey, a JSON serialized key like {'id': 45} will be used in KSQL as ROWKEY INT KEY. Particularly, you can't reference it with the identifier id as you might expect. You also cant create messages with keys named anything other than ROWKEY. Is that a reasonable intermediate product experience?

If the answer to that is 'no', then changing the key name necessitates changing the query semantics, which is going to be much more work.

You also cant create messages with keys named anything other than ROWKEY. Is that a reasonable intermediate product experience?

@apurvam wouldn't you still be able to assign a row key using any column name (via WITH (key=...)), but just not reference it by anything other than ROWKEY?

I would of liked to decouple fixing the query semantics from the primitive key work. However, it looks like some of the work will need to be done earlier than anticipated.

Not sure to what extend this affect query semantics? In the old model, there is ROWKEY that is stored in the key and <A,B> that is stored in the value, however, we know that either A or B is the same as ROWKEY (ie, the actual data schema is <A,B> and it does not change between S1 and S2).

Thus, both example use actually a different schema: <A,B> vs <A,B,C> and thus the example does not compare apples to apples from my understanding?

With the new semantics, ROWKEY is not a copy of a "data schema" column any longer, and thus just switching A and B from <A, <B,C>> to <B, <A,C>> does not sound a semantic change to me. The schema for S1 and S2 is <A,B,C>. (Thus, I don't think that generic keys are a requirement here as PARTITION BY a, b is not supported yet?)

In the old code the corresponding example would be ROWKEY in the key and <A,B,C> in the value with ROWKEY == A in S1 and ROWKEY == B in S2 while the "data schema" is always <A,B,C>.

Hence, I think we can just implement it without worrying about a semantic change? Let me know if this make sense of if I am totally off...

@apurvam wouldn't you still be able to assign a row key using any column name (via WITH (key=...)), but just not reference it by anything other than ROWKEY?

I think @big-andy-coates should take that one. But even if you could, I'm not sure how it addresses the usability gaps which have been mentioned.

But even if you could, I'm not sure how it addresses the usability gaps which have been mentioned.
It doesn't, I'm just trying to make sure I understand everything correctly.

@mjsax

Not sure to what extend this affect query semantics? In the old model, there is ROWKEY that is stored in the key and that is stored in the value, however, we know that either A or B is the same as ROWKEY (ie, the actual data schema is and it does not change between S1 and S2).

It's not true that 'we know that either A or B is the same as ROWKEY'. ROWKEY could contain completely different data.

-- Schema: ROWKEY => A, B
-- Neither A nor B duplicate the data stored in ROWKEY.
CREATE STREAM FOO (ROWKEY INT  KEY, B INT, C INT) WITH (kafka_topic='x', value_format='json');

-- Schema: ROWKEY => A, B
-- The data stored in the old ROWKEY has been lost and overwritten by B.
CREATE STREAM BAR AS SELECT * FROM FOO PARTITION BY B;

I think you're misunderstanding the problem. You're assuming ROWKEY is duplicated in the value - that's not a requirement and never was for streams.

Thanks for clarifying.

Seems the old PARTITION BY that overwrite ROWKEY with the value of B and looses the original ROWKEY is already "broken". I agree that it should be fixed. Loosing a column if PARTITION BY is used sounds not right.

@derekjn

You also cant create messages with keys named anything other than ROWKEY. Is that a reasonable intermediate product experience?

@apurvam wouldn't you still be able to assign a row key using any column name (via WITH (key=...)), but just not reference it by anything other than ROWKEY?

The WITH(Key='A', ..) functionality requires a copy of the key in the value. For many users this won't be the case, so they'd need to pre-process there data with something other than KSQL to get this.

So, IMHO, this is not a workable solution.

Was this page helpful?
0 / 5 - 0 ratings