Describe the bug
I have 2 clusters, Beijing and Guangzhou IDC. I created a namespace and enabled geo-replication.
Under this namespace, I first successfully created a producer in Beijing IDC with topic name is enant-test/n-1/topic-1 and Schema.STRING, but when I create a consumer in Guangzhou with same schema, it keeps failing.
To Reproduce
Steps to reproduce the behavior:
tenant-test/n-1 and enabled geo-replicationenant-test/n-1/topic-1org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to checkorg.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to check
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1628)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483)
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543)
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107)
at com.kugou.fanxing.starduration.Application.main(Application.java:15)
Caused by: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to check
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:673)
at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:97)
at
Desktop (please complete the following information):
Additional context
I located the following source code based on the exception information
@Override
public CompletableFuture<Void> checkConsumerCompatibility(String schemaId, SchemaData schemaData,
SchemaCompatibilityStrategy strategy) {
return getSchema(schemaId).thenCompose(existingSchema -> {
if (existingSchema != null && !existingSchema.schema.isDeleted()) {
if (strategy == SchemaCompatibilityStrategy.BACKWARD ||
strategy == SchemaCompatibilityStrategy.FORWARD ||
strategy == SchemaCompatibilityStrategy.FORWARD_TRANSITIVE ||
strategy == SchemaCompatibilityStrategy.FULL) {
return checkCompatibilityWithLatest(schemaId, schemaData, SchemaCompatibilityStrategy.BACKWARD);
} else {
return checkCompatibilityWithAll(schemaId, schemaData, strategy);
}
} else {
return FutureUtil.failedFuture(new IncompatibleSchemaException("Topic does not have schema to check"));
}
});
}
I checked path /schemas in zk according to the method getSchema and found that there is no schemaInfo for this topic.
I continued to read the source code and found the following code.
hasSchema || isActive() should be false,but ledger.getTotalSize() should also be 0,because I haven't sent any messages in cluster B.
@Override
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema()
.thenCompose((hasSchema) -> {
if (hasSchema || isActive() || ledger.getTotalSize() != 0) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
}
});
}
@zyllt Thanks for your feedback. I think the reason is the isActive() method. The replicator also creates a subscription on the topic, so isActive() returns true. I think we don't need to check if there are subscriptions on the topic since users can use pulsar-admin to create subscriptions and geo-replication also creates subscription on the topic.
I will fix this issue
@codelipenghui Thanks for your reply. I originally thought the reason is the ledger.getTotalSize() by geo-replication, through your reply I thought it was indeed isActive() problem.I will actually verify it.
the message has replicate to remote cluster, so ledger.getTotalSize() != 0 is true
@zyllt @zplinuxlover We need a PIP for fixing this issue. Currently, Pulsar does not replicate schema between Pulsar clusters. The schema info is stored in the local zookeeper and bookkeeper, so the new proposal is for replicating schemas between different clusters.