Ksql: Mismatched topic partition/replica values after server restart

Created on 31 Aug 2019  路  7Comments  路  Source: confluentinc/ksql

Steps to recreate:

CREATE STREAM foo(age BIGINT) WITH (KAFKA_TOPIC='foo',  partitions=2, VALUE_FORMAT='JSON');
CREATE STREAM foo_copy AS SELECT * from foo; 

Stop server
Delete and recreate FOO_COPY topic outside of KSQL with partitions=6
Restarting the server results in:

ksql> describe extended foo_copy;

Name                 : FOO_COPY
Type                 : STREAM
Key field            : 
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : FOO_COPY (partitions: 6, replication: 1)

 Field   | Type                      
-------------------------------------
 ROWTIME | BIGINT           (system) 
 ROWKEY  | VARCHAR(STRING)  (system) 
 AGE     | BIGINT                    
-------------------------------------

Queries that write from this STREAM
-----------------------------------
CSAS_FOO_COPY_0 : CREATE STREAM FOO_COPY WITH (KAFKA_TOPIC='FOO_COPY', PARTITIONS=2, REPLICAS=1) AS SELECT *
FROM FOO FOO;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------


(Statistics of the local KSQL server interaction with the Kafka topic FOO_COPY)

If the 6 partition FOO_COPY existed when running the query the first time, it fails with

A Kafka topic with the name 'FOO_COPY' already exists, with different partition/replica configuration than required. KSQL expects 2 partitions (topic has 6), and 1 replication factor (topic has 1).

In the SandboxedKafkaTopicClient, there's a validateTopicProperties() that checks to ensure that the topic partition/replica values match. This validation isn't done when processing commands from the command topic during server restoration.

Related to https://github.com/confluentinc/ksql/issues/2435

This was done on the master branch as of 8/30/2019

bug

Most helpful comment

Having similar issue here.
It also seems that headless server behaves differently than interactive server: If I have a topic foo with partition=6, replicas=3, both using CREATE STREAM FOO WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='AVRO');, interactive server would create the stream successfully regardless of whether I specify the partition/replicas or not, while headless server would fails with KSQL expects x partitions (topic has 6), and y replication factor (topic has 3).

All 7 comments

Having similar issue here.
It also seems that headless server behaves differently than interactive server: If I have a topic foo with partition=6, replicas=3, both using CREATE STREAM FOO WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='AVRO');, interactive server would create the stream successfully regardless of whether I specify the partition/replicas or not, while headless server would fails with KSQL expects x partitions (topic has 6), and y replication factor (topic has 3).

Same issue here.
Just to be sure, checked my both ksql.sink.replicas and partitions configurations, and it is set to NULL.
Also, same query that fails in headless server works fine in interactive server.
Error I get says it should be4 and 1 values (I think those are the default values)
KSQL expects 4 partitions (topic has 1), and 1 replication factor (topic has 1).

I'm not sure what the "right" behavior here would be (at least for what's described in the original ticket when a user deletes and recreates a topic externally). It makes sense that we want to be able to recover as much as we can independent of this failure (I don't think we should fail the restart if the external state changes) because there may be totally unrelated and unaffected queries. To me, I see this similarly to deleting a file in the underlying filesystem backing a database and then recreating it with a different format - the behavior, rightly in my opinion, is undefined in that case and you're likely to get unpredictable errors while trying to process that data.

cc @derekjn

@agavra I actually agree with you here and I think your analogy is spot on. I didn't originally realize that this issue involves a manual topic deletion before restarting the server. I'm not sure that there is much we can do here that would be net better than the current behavior. It seems like the best we can do is a more descriptive error message that makes it clear to the user that they need to reconfigure the topic before it's usable by ksqlDB.

OK - in that case I'll be closing this out as won't fix. The DESCRIBE query should return the query status and the error that happened; we can revisit this ticket later if we've miscategorized it.

I'm not sure what the "right" behavior here would be

I can agree with that if this problem arise specifically when topic is deleted and recreated, as it described by @stevenpyzhang.

But what about issue mentioned by @YFGu0618

It also seems that headless server behaves differently than interactive server:

I encountered same problem. Seems like there is similar behavioral issue but not necessarily related to deleting existing topic.
Same CS query that works fine in interactive mode, fails in headless mode on exactly same reason as long as topic params are different than default values (4 partitions and 1 replication factor) defined in headless server.

Its not like having this issue it prevents from creating streams for existing topics, but it nice to have this option to create some basic pre-configured streams on system startup.

@pavelpe - can you open a new ticket for that so that we can track it separately from this issue? As a quick update, the original ticket indicates that something should fail when it actually succeeds. When you create the new ticket, it would help if you indicated the version of ksql you were using as well as we cleaned up a lot of this behavior in more recent versions.

Was this page helpful?
0 / 5 - 0 ratings