Pulsar: Failed to create Consumer for the first time

Created on 20 Mar 2020  路  5Comments  路  Source: apache/pulsar

Describe the bug
I have 2 clusters, Beijing and Guangzhou IDC. I created a namespace and enabled geo-replication.
Under this namespace, I first successfully created a producer in Beijing IDC with topic name is enant-test/n-1/topic-1 and Schema.STRING, but when I create a consumer in Guangzhou with same schema, it keeps failing.
To Reproduce
Steps to reproduce the behavior:

  1. deploy Cluster A and B,then created a namespace tenant-test/n-1 and enabled geo-replication
  2. created the producer in ClusterA and send a message,topic name is enant-test/n-1/topic-1
  3. create the consumer in Cluster B with same topic
  4. See error org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to check
    Screenshots
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to check
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1628)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483)
        at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761)
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543)
        at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122)
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693)
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:303)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107)
        at com.kugou.fanxing.starduration.Application.main(Application.java:15)
Caused by: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to check
        at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:673)
        at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:97)
        at 

Desktop (please complete the following information):

  • OS:Pulsar 2.5.0

Additional context
I located the following source code based on the exception information

    @Override
    public CompletableFuture<Void> checkConsumerCompatibility(String schemaId, SchemaData schemaData,
                                                              SchemaCompatibilityStrategy strategy) {
        return getSchema(schemaId).thenCompose(existingSchema -> {
            if (existingSchema != null && !existingSchema.schema.isDeleted()) {
                    if (strategy == SchemaCompatibilityStrategy.BACKWARD ||
                            strategy == SchemaCompatibilityStrategy.FORWARD ||
                            strategy == SchemaCompatibilityStrategy.FORWARD_TRANSITIVE ||
                            strategy == SchemaCompatibilityStrategy.FULL) {
                        return checkCompatibilityWithLatest(schemaId, schemaData, SchemaCompatibilityStrategy.BACKWARD);
                    } else {
                        return checkCompatibilityWithAll(schemaId, schemaData, strategy);
                    }
            } else {
                return FutureUtil.failedFuture(new IncompatibleSchemaException("Topic does not have schema to check"));
            }
        });
    }

I checked path /schemas in zk according to the method getSchema and found that there is no schemaInfo for this topic.
I continued to read the source code and found the following code.
hasSchema || isActive() should be false,but ledger.getTotalSize() should also be 0,because I haven't sent any messages in cluster B.

    @Override
    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
        return hasSchema()
            .thenCompose((hasSchema) -> {
                    if (hasSchema || isActive() || ledger.getTotalSize() != 0) {
                        return checkSchemaCompatibleForConsumer(schema);
                    } else {
                        return addSchema(schema).thenCompose(schemaVersion ->
                                CompletableFuture.completedFuture(null));
                    }
                });
    }
triagweek-13 typbug

All 5 comments

@zyllt Thanks for your feedback. I think the reason is the isActive() method. The replicator also creates a subscription on the topic, so isActive() returns true. I think we don't need to check if there are subscriptions on the topic since users can use pulsar-admin to create subscriptions and geo-replication also creates subscription on the topic.

I will fix this issue

@codelipenghui Thanks for your reply. I originally thought the reason is the ledger.getTotalSize() by geo-replication, through your reply I thought it was indeed isActive() problem.I will actually verify it.

the message has replicate to remote cluster, so ledger.getTotalSize() != 0 is true

@zyllt @zplinuxlover We need a PIP for fixing this issue. Currently, Pulsar does not replicate schema between Pulsar clusters. The schema info is stored in the local zookeeper and bookkeeper, so the new proposal is for replicating schemas between different clusters.

Was this page helpful?
0 / 5 - 0 ratings