Pulsar: Unable to delete topic when both consumer (subscription) and reader has been connected

Created on 12 Aug 2019  路  6Comments  路  Source: apache/pulsar

Describe the bug
After connecting and closing one subscription and one reader it's not possible to delete a topic through the java admin client. The bug will not manifest if either only subscriptions or only readers are connected, only if a combination of them are.

To Reproduce
Steps to reproduce the behavior:

  1. Start a local standalone pulsar and make sure the admin and broker ports in the test code below match
  2. Run the following java code to reproduce the error:
package sample;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.annotations.Test;

import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class ConsumerAndReaderCombinationTest {

    void deleteTenantIfPresent(PulsarAdmin admin, String tenant) throws PulsarAdminException {
        if (admin.tenants().getTenants().contains(tenant)) {
            for (String namespace : admin.namespaces().getNamespaces(tenant)) {
                List<String> existingTopics = admin.namespaces().getTopics(namespace);
                for (String topic : existingTopics) {
                    admin.topics().delete(topic);
                }
                admin.namespaces().deleteNamespace(namespace);
            }
            admin.tenants().deleteTenant(tenant);
        }
    }

    @Test
    public void provokeBug() throws Exception {
        final String adminUrl = "http://localhost:8080";
        final String brokerUrl = "pulsar://localhost:6650";

        final String tenant = "delete-topic-test-magic48295";
        final String namespace = "ns";
        final String topic = "the-topic";
        final String qualifiedTopic = "persistent://" + tenant + "/" + namespace + "/" + topic;

        /*
         * Ensure a clean tenant
         */
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).authentication(new AuthenticationDisabled()).build()) {
            deleteTenantIfPresent(admin, tenant);
            Set<String> allowedClusters = new LinkedHashSet<>();
            allowedClusters.add("standalone");
            admin.tenants().createTenant(tenant, new TenantInfo(Collections.emptySet(), allowedClusters));
            admin.namespaces().createNamespace(tenant + "/" + namespace);
        }

        /*
         * Open and close both a consumer on a subscription and a reader on the same topic
         */
        try (PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl).build()) {
            try (Consumer<byte[]> consumer = client.newConsumer()
                    .topic(qualifiedTopic)
                    .subscriptionType(SubscriptionType.Exclusive)
                    .consumerName("testng")
                    .subscriptionName("s1")
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                    .subscribe()) {
                // no need to do anything with the consumer, just close it again
            }
            try (Reader<byte[]> reader = client.newReader().topic(qualifiedTopic).startMessageId(MessageId.earliest).create()) {
                // no need to do anything with the reader, just close it again
            }
        }

        try (PulsarAdmin admin = PulsarAdmin.builder()
                .serviceHttpUrl(adminUrl)
                .readTimeout(3, TimeUnit.SECONDS)
                .authentication(new AuthenticationDisabled())
                .build()) {

            /*
             * When bug is present, this call will throw a TimeoutException (read-timeout) after 3 seconds
             */
            admin.topics().delete(qualifiedTopic);
        }
    }
}
  1. Observe lines similiar to the following in the broker log.
16:48:45.579 [pulsar-web-57-3] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://delete-topic-test-magic48295/ns/the-topic][s1] Unsubscribing
16:48:45.579 [pulsar-web-57-3] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://delete-topic-test-magic48295/ns/the-topic][s1] Successfully closed subscription [ManagedCursorImpl{ledger=delete-topic-test-magic48295/ns/persistent/the-topic, name=s1, ackPos=10:-1, readPos=10:0}]
16:48:45.583 [pulsar-web-57-3] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper - [delete-topic-test-magic48295/ns/persistent/the-topic] Remove consumer=s1
16:48:45.592 [ProcessThread(sid:0 cport:2181):] INFO  org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x10000f989b00009 type:delete cxid:0xe3 zxid:0xba txntype:-1 reqpath:n/a Error Path:/ledgers/00/0000 Error:KeeperErrorCode = Directory not empty for /ledgers/00/0000
16:48:45.591 [bookkeeper-ml-workers-OrderedExecutor-2-0] ERROR org.apache.bookkeeper.common.util.SafeRunnable - Unexpected throwable caught 
java.lang.IllegalArgumentException: inconsistent range
    at java.util.concurrent.ConcurrentSkipListMap$SubMap.<init>(ConcurrentSkipListMap.java:2620) ~[?:1.8.0_212]
    at java.util.concurrent.ConcurrentSkipListMap.subMap(ConcurrentSkipListMap.java:2078) ~[?:1.8.0_212]
    at org.apache.bookkeeper.mledger.util.RangeCache.removeRange(RangeCache.java:132) ~[org.apache.pulsar-managed-ledger-original-2.4.0.jar:2.4.0]
    at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.invalidateEntries(EntryCacheImpl.java:151) ~[org.apache.pulsar-managed-ledger-original-2.4.0.jar:2.4.0]
    at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$7.operationComplete(ManagedLedgerImpl.java:776) ~[org.apache.pulsar-managed-ledger-original-2.4.0.jar:2.4.0]
    at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$7.operationComplete(ManagedLedgerImpl.java:764) ~[org.apache.pulsar-managed-ledger-original-2.4.0.jar:2.4.0]
    at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$13(MetaStoreImplZookeeper.java:308) ~[org.apache.pulsar-managed-ledger-original-2.4.0.jar:2.4.0]
    at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-original-2.4.0.jar:2.4.0]
    at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.9.2.jar:4.9.2]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_212]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_212]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
16:48:45.610 [BookKeeperClientWorker-OrderedExecutor-5-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [delete-topic-test-magic48295/ns/persistent/the-topic][s1] Deleted cursor ledger 11
16:48:47.557 [pulsar-web-57-11] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.2 - - [12/Aug/2019:16:48:47 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats HTTP/1.1" 200 813 "-" "Pulsar-Java-v2.4.0" 5

Expected behavior
it's always expected to be possible to delete a topic that has no connected clients.

Desktop (please complete the following information):

  • OS: macOS
componenclient typbug

All 6 comments

I can confirm this is an issue: once a Reader as been seen on a topic, it is not possible to delete any Consumer subscription.

This is a massive issue because it means that we basically cannot use Readers and Consumers combined on a topic, because we will never be able to clean that topic and it will be amassing data forever (until quotas are hit).

Here's minimal replication case:

  • this is a 4 broker Pulsar cluster (in Kubernetes)
  • accessed via proxied pulsar:// protocol (via the Pulsar Proxy)
  1. subscribe with a Consumer, then stop the Consumer
  2. delete the subscription with admin tool -- this will work
    bin/pulsar-admin topics unsubscribe public/test-seqid/topic-3 -s failover-subscription
  3. subscribe with a Reader, then stop the Reader
  4. subscribe with a Consumer, then stop the Consumer
  5. delete the subscription with admin tool -- this will NOT work (timeout)
    bin/pulsar-admin topics unsubscribe public/test-seqid/topic-3 -s failover-subscription

image

Client code:

    pulsar =
        PulsarClient.builder()
            .serviceUrl(pulsarUrl)
            .connectionsPerBroker(1) // default: 1
            .connectionTimeout(10, TimeUnit.SECONDS)
            .enableTcpNoDelay(true)
            .keepAliveInterval(999, TimeUnit.DAYS)
            .maxBackoffInterval(5, TimeUnit.SECONDS)
            .startingBackoffInterval(1, TimeUnit.SECONDS)
            .statsInterval(60, TimeUnit.SECONDS)
            .ioThreads(1) // default: 1
            .listenerThreads(1) // default: 1
            .maxConcurrentLookupRequests(5000) // default: 5000
            .maxLookupRequests(50000) // default: 50000
            .maxNumberOfRejectedRequestPerConnection(50) // default: 50
            .operationTimeout(30, TimeUnit.SECONDS) // default: 30 sec
            .build();

Consumer code:

        Consumer<Test> consumer = pulsar
            .newConsumer(Schema.PROTOBUF(Test.class))
            .consumerName("consumer1")
            .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
            .subscriptionType(SubscriptionType.Failover)
            .subscriptionName("failover-subscription")
            .topic("persistent://public/test-seqid/topic-3")
            .subscribe();

    while (true) {
      Message<Test> message = consumer.receive();
      ...

Reader code:

        Reader<Test> reader = pulsar
            .newReader(Schema.PROTOBUF(Test.class))
            .readerName("reader1")
            .topic("persistent://public/test-seqid/topic-3")
            .startMessageId(MessageId.earliest)
            .create();

    while (true) {
      Message<Test> message = reader.readNext();
      ...

(Another separate(?) issue is why the Reader is seeing any previously published data, when retentions on the namespace are zero (both time and size), and all messages were acknowledged. This is confusing.)

@sijie - I think this is not a component/client issue (since the subscription cannot be deleted through the admin cli either)

The problem (inconsistent range) has been fixed as part of fixing #5621.

why the Reader is seeing any previously published data, when retentions on the namespace are zero (both time and size), and all messages were acknowledged. This is confusing.

@youurayy the data is deleted segment by segment. unless the whole segment is deleted, the reader will see the messages of the segment that is not deleted.

@sijie - please is it known when should 2.4.2 proper be out (image pushed)? There's no new commits on the milestone for last 7 days.

@youurayy the 2.4.2 is under voting. once it passes voting, it will be released. I would expect it is going out around Friday. You can follow the process by subscribing to [email protected] mailing list, or you can read community weekly updates (https://streamnative.io/weekly/2019/2019-11/2019-11-22-pulsar-weekly/) as well.

Was this page helpful?
0 / 5 - 0 ratings