This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 86d0830 Fix the default retry letter and dead letter topic name (#10129) 86d0830 is described below commit 86d0830b7b173970424266fcf2f065f6df171020 Author: WangJialing <65590138+wangjialing...@users.noreply.github.com> AuthorDate: Sat May 15 10:01:54 2021 +0800 Fix the default retry letter and dead letter topic name (#10129) ### Motivation Fixes #9327 ### Modifications Correct the default retry letter and dead letter topic name depend on full topic name --- .../apache/pulsar/client/api/RetryTopicTest.java | 86 +++++++++++++++++++++- .../pulsar/client/impl/ConsumerBuilderImpl.java | 24 +++--- 2 files changed, 98 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index 3840efa..fc84a62 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import lombok.Cleanup; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -68,7 +69,85 @@ public class RetryTopicTest extends ProducerConsumerBase { @Cleanup PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) - .topic("persistent://my-property/my-ns/my-subscription-DLQ") + .topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + for (int i = 0; i < sendMessages; i++) { + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + + producer.close(); + + int totalReceived = 0; + do { + Message<byte[]> message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + + Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + + checkConsumer.close(); + } + + //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed + @Test + public void testRetryTopicNameForCompatibility () throws Exception { + final String topic = "persistent://my-property/my-ns/retry-topic"; + + final String oldRetryTopic = "persistent://my-property/my-ns/my-subscription-RETRY"; + + final String oldDeadLetterTopic = "persistent://my-property/my-ns/my-subscription-DLQ"; + + final int maxRedeliveryCount = 2; + + final int sendMessages = 100; + + admin.topics().createPartitionedTopic(oldRetryTopic, 2); + admin.topics().createPartitionedTopic(oldDeadLetterTopic, 2); + + Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic(oldDeadLetterTopic) .subscriptionName("my-subscription") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); @@ -116,6 +195,7 @@ public class RetryTopicTest extends ProducerConsumerBase { assertNull(checkMessage); checkConsumer.close(); + newPulsarClient.close(); } /** @@ -145,7 +225,7 @@ public class RetryTopicTest extends ProducerConsumerBase { // subscribe to the DLQ topics before consuming original topics Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES) - .topic("persistent://my-property/my-ns/my-subscription-DLQ") + .topic("persistent://my-property/my-ns/retry-topic-1-my-subscription-DLQ") .subscriptionName("my-subscription") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); @@ -224,7 +304,7 @@ public class RetryTopicTest extends ProducerConsumerBase { @Cleanup PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) - .topic("persistent://my-property/my-ns/my-subscription-DLQ") + .topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ") .subscriptionName("my-subscription") .subscribe(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 76b53ec..47f9991 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -28,9 +27,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.Collectors; - import lombok.AccessLevel; import lombok.Getter; +import lombok.NonNull; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; @@ -44,22 +43,18 @@ import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.MessageCrypto; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; +import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.DefaultCryptoKeyReader; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; -import com.google.common.collect.Lists; -import lombok.NonNull; - @Getter(AccessLevel.PUBLIC) public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { @@ -121,8 +116,19 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { } if(conf.isRetryEnable() && conf.getTopicNames().size() > 0 ) { TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next()); - String retryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; - String deadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; + String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; + String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; + + //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed + String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; + String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; + if (client.getPartitionedTopicMetadata(oldRetryLetterTopic).join().partitions > 0) { + retryLetterTopic = oldRetryLetterTopic; + } + if (client.getPartitionedTopicMetadata(oldDeadLetterTopic).join().partitions > 0) { + deadLetterTopic = oldDeadLetterTopic; + } + if(conf.getDeadLetterPolicy() == null) { conf.setDeadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES)