sijie closed pull request #2508: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2508
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 883bf5553e..5752034a79 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -265,7 +265,8 @@ public SendMessageInfo sendMessages(final List<Entry> 
entries, SendListener list
                 if (i == (entries.size() - 1)) {
                     promise = writePromise;
                 }
-                ctx.write(Commands.newMessage(consumerId, messageId, 
metadataAndPayload), promise);
+                int redeliveryCount = 
subscription.getDispatcher().getRedeliveryTracker().getRedeliveryCount(PositionImpl.get(messageId.getLedgerId(),
 messageId.getEntryId()));
+                ctx.write(Commands.newMessage(consumerId, messageId, 
redeliveryCount, metadataAndPayload), promise);
                 messageId.recycle();
                 messageIdBuilder.recycle();
                 entry.release();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 0d7b7d6e55..43f65cd73b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -70,4 +70,6 @@
 
     void addUnAckedMessages(int unAckMessages);
 
+    RedeliveryTracker getRedeliveryTracker();
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
new file mode 100644
index 0000000000..99b38ccf2f
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.mledger.Position;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InMemoryRedeliveryTracker implements RedeliveryTracker {
+
+    private ConcurrentHashMap<Position, AtomicInteger> trackerCache = new 
ConcurrentHashMap<>(16);
+
+    @Override
+    public int incrementAndGetRedeliveryCount(Position position) {
+        trackerCache.putIfAbsent(position, new AtomicInteger(0));
+        return trackerCache.get(position).incrementAndGet();
+    }
+
+    @Override
+    public int getRedeliveryCount(Position position) {
+        return trackerCache.getOrDefault(position, new AtomicInteger(0)).get();
+    }
+
+    @Override
+    public void remove(Position position) {
+        trackerCache.remove(position);
+    }
+
+    @Override
+    public void removeBatch(List<Position> positions) {
+        if (positions != null) {
+            positions.forEach(this::remove);
+        }
+    }
+
+    @Override
+    public void clear() {
+        trackerCache.clear();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
new file mode 100644
index 0000000000..0f2e54a542
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.mledger.Position;
+
+import java.util.List;
+
+public interface RedeliveryTracker {
+
+    int incrementAndGetRedeliveryCount(Position position);
+
+    int getRedeliveryCount(Position position);
+
+    void remove(Position position);
+
+    void removeBatch(List<Position> positions);
+
+    void clear();
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
new file mode 100644
index 0000000000..521417f535
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.mledger.Position;
+
+import java.util.List;
+
+public class RedeliveryTrackerDisabled implements RedeliveryTracker {
+
+    public static final RedeliveryTrackerDisabled REDELIVERY_TRACKER_DISABLED 
= new RedeliveryTrackerDisabled();
+
+    private RedeliveryTrackerDisabled() {}
+
+    @Override
+    public int incrementAndGetRedeliveryCount(Position position) {
+        return 0;
+    }
+
+    @Override
+    public int getRedeliveryCount(Position position) {
+        return 0;
+    }
+
+    @Override
+    public void remove(Position position) {
+        // no-op
+    }
+
+    @Override
+    public void removeBatch(List<Position> positions) {
+        // no-op
+    }
+
+    @Override
+    public void clear() {
+        // no-op
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index f40564bf69..2067a80d8e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -31,6 +31,8 @@
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
@@ -54,6 +56,7 @@
     private volatile int totalAvailablePermits = 0;
 
     private final ServiceConfiguration serviceConfig;
+    private final RedeliveryTracker redeliveryTracker;
 
     public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, 
Subscription subscription) {
         this.topic = topic;
@@ -61,6 +64,7 @@ public 
NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscr
         this.name = topic.getName() + " / " + subscription.getName();
         this.msgDrop = new Rate();
         this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
+        this.redeliveryTracker = 
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
 
     @Override
@@ -178,6 +182,11 @@ public SubType getType() {
         return SubType.Shared;
     }
 
+    @Override
+    public RedeliveryTracker getRedeliveryTracker() {
+        return redeliveryTracker;
+    }
+
     @Override
     public void sendMessages(List<Entry> entries) {
         Consumer consumer = TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0 ? 
getNextConsumer() : null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 2083cc7864..787fb00a94 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -29,6 +29,8 @@
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
@@ -40,6 +42,7 @@
     private final Rate msgDrop;
     private final Subscription subscription;
     private final ServiceConfiguration serviceConfig;
+    private final RedeliveryTracker redeliveryTracker;
 
     public NonPersistentDispatcherSingleActiveConsumer(SubType 
subscriptionType, int partitionIndex,
             NonPersistentTopic topic, Subscription subscription) {
@@ -48,6 +51,7 @@ public NonPersistentDispatcherSingleActiveConsumer(SubType 
subscriptionType, int
         this.subscription = subscription;
         this.msgDrop = new Rate();
         this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
+        this.redeliveryTracker = 
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
 
     @Override
@@ -117,6 +121,11 @@ public void consumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
         // No-op
     }
 
+    @Override
+    public RedeliveryTracker getRedeliveryTracker() {
+        return redeliveryTracker;
+    }
+
     @Override
     protected void scheduleReadOnActiveConsumer() {
         // No-op
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 17c5db7a6a..5198a139ec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -44,6 +44,8 @@
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
@@ -69,6 +71,7 @@
 
     private CompletableFuture<Void> closeFuture = null;
     private ConcurrentLongPairSet messagesToReplay;
+    private final RedeliveryTracker redeliveryTracker;
 
     private boolean havePendingRead = false;
     private boolean havePendingReplayRead = false;
@@ -97,6 +100,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic 
topic, ManagedCurso
         this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
         this.topic = topic;
         this.messagesToReplay = new ConcurrentLongPairSet(512, 2);
+        this.redeliveryTracker = new InMemoryRedeliveryTracker();
         this.readBatchSize = MaxReadBatchSize;
         this.maxUnackedMessages = 
topic.getBrokerService().pulsar().getConfiguration()
                 .getMaxUnackedMessagesPerSubscription();
@@ -556,7 +560,10 @@ public synchronized void 
redeliverUnacknowledgedMessages(Consumer consumer) {
 
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, List<PositionImpl> positions) {
-        positions.forEach(position -> 
messagesToReplay.add(position.getLedgerId(), position.getEntryId()));
+        positions.forEach(position -> {
+            messagesToReplay.add(position.getLedgerId(), 
position.getEntryId());
+            redeliveryTracker.incrementAndGetRedeliveryCount(position);
+        });
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Redelivering unacknowledged messages for 
consumer {}", name, consumer, positions);
         }
@@ -624,5 +631,10 @@ public DispatchRateLimiter getDispatchRateLimiter() {
         return dispatchRateLimiter;
     }
 
+    @Override
+    public RedeliveryTracker getRedeliveryTracker() {
+        return redeliveryTracker;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 9ab7b877c0..89dfb47331 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -39,6 +39,8 @@
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
@@ -62,6 +64,8 @@
     private final ServiceConfiguration serviceConfig;
     private ScheduledFuture<?> readOnActiveConsumerTask = null;
 
+    private final RedeliveryTracker redeliveryTracker;
+
     public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, 
SubType subscriptionType, int partitionIndex,
             PersistentTopic topic) {
         super(subscriptionType, partitionIndex, topic.getName());
@@ -72,6 +76,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor 
cursor, SubType su
         this.readBatchSize = MaxReadBatchSize;
         this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
         this.dispatchRateLimiter = null;
+        this.redeliveryTracker = 
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
 
     protected void scheduleReadOnActiveConsumer() {
@@ -307,6 +312,7 @@ private synchronized void 
internalRedeliverUnacknowledgedMessages(Consumer consu
     @Override
     public void redeliverUnacknowledgedMessages(Consumer consumer, 
List<PositionImpl> positions) {
         // We cannot redeliver single messages to single consumers to preserve 
ordering.
+        positions.forEach(redeliveryTracker::incrementAndGetRedeliveryCount);
         redeliverUnacknowledgedMessages(consumer);
     }
 
@@ -485,5 +491,10 @@ public void addUnAckedMessages(int unAckMessages) {
         // No-op
     }
 
+    @Override
+    public RedeliveryTracker getRedeliveryTracker() {
+        return redeliveryTracker;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index dac9f055d7..669c6d41d4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -195,6 +195,7 @@ public void acknowledgeMessage(List<Position> positions, 
AckType ackType, Map<St
                 log.debug("[{}][{}] Individual acks on {}", topicName, 
subName, positions);
             }
             cursor.asyncDelete(positions, deleteCallback, positions);
+            dispatcher.getRedeliveryTracker().removeBatch(positions);
         }
 
         if (topic.getManagedLedger().isTerminated() && 
cursor.getNumberOfEntriesInBacklog() == 0) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 99aec9355b..bff1043ba9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -178,7 +178,7 @@ private void reset() {
         }
 
         @Override
-        void messageReceived(MessageIdData messageId, ByteBuf 
headersAndPayload, ClientCnx cnx) {
+        void messageReceived(MessageIdData messageId, int redeliveryCount, 
ByteBuf headersAndPayload, ClientCnx cnx) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, 
subscription,
                           messageId.getEntryId(), messageId.getLedgerId(), 
messageId.getPartition());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
new file mode 100644
index 0000000000..114152af07
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNull;
+
+public class DeadLetterTopicTest extends ProducerConsumerBase {
+
+    private static final Logger log = 
LoggerFactory.getLogger(DeadLetterTopicTest.class);
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testDeadLetterTopic() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+
+        final int maxRedeliveryCount = 2;
+
+        final int sendMessages = 100;
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+
+        producer.close();
+
+
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer = 
pulsarClient.newConsumer(Schema.BYTES)
+                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            log.info("dead letter consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+
+        Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Message<byte[]> checkMessage = checkConsumer.receive(3, 
TimeUnit.SECONDS);
+        if (checkMessage != null) {
+            log.info("check consumer received message : {} {}", 
checkMessage.getMessageId(), new String(checkMessage.getData()));
+        }
+        assertNull(checkMessage);
+
+        checkConsumer.close();
+    }
+
+    @Test
+    public void testDeadLetterTopicWithMultiTopic() throws Exception {
+        final String topic1 = 
"persistent://my-property/my-ns/dead-letter-topic-1";
+        final String topic2 = 
"persistent://my-property/my-ns/dead-letter-topic-2";
+
+        final int maxRedeliveryCount = 2;
+
+        int sendMessages = 100;
+
+        Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic1)
+                .create();
+
+        Producer<byte[]> producer2 = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic2)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer1.send(String.format("Hello Pulsar [%d]", i).getBytes());
+            producer2.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+
+        sendMessages = sendMessages * 2;
+
+        producer1.close();
+        producer2.close();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic1, topic2)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer = 
pulsarClient.newConsumer(Schema.BYTES)
+                
.topic("persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ",
 "persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            log.info("dead letter consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+
+        Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic1, topic2)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Message<byte[]> checkMessage = checkConsumer.receive(3, 
TimeUnit.SECONDS);
+        if (checkMessage != null) {
+            log.info("check consumer received message : {} {}", 
checkMessage.getMessageId(), new String(checkMessage.getData()));
+        }
+        assertNull(checkMessage);
+
+        checkConsumer.close();
+    }
+
+    @Test
+    public void testDeadLetterTopicByCustomTopicName() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+        final int maxRedeliveryCount = 2;
+        final int sendMessages = 100;
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+        producer.close();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
+                .receiverQueueSize(100)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(maxRedeliveryCount)
+                        
.deadLetterTopic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
+                        .build())
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        Consumer<byte[]> deadLetterConsumer = 
pulsarClient.newConsumer(Schema.BYTES)
+                
.topic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
+                .subscriptionName("my-subscription")
+                .subscribe();
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            log.info("dead letter consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+        deadLetterConsumer.close();
+        consumer.close();
+        Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        Message<byte[]> checkMessage = checkConsumer.receive(3, 
TimeUnit.SECONDS);
+        if (checkMessage != null) {
+            log.info("check consumer received message : {} {}", 
checkMessage.getMessageId(), new String(checkMessage.getData()));
+        }
+        assertNull(checkMessage);
+        checkConsumer.close();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
index 85dbb245b9..de1b88a21f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
@@ -73,7 +73,7 @@ public void testCompactedOutMessages() throws Exception {
              = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic1)
                 .subscriptionName("my-subscriber-name").subscribe()) {
             // shove it in the sideways
-            consumer.receiveIndividualMessagesFromBatch(metadata, batchBuffer,
+            consumer.receiveIndividualMessagesFromBatch(metadata, 0, 
batchBuffer,
                                                         
MessageIdData.newBuilder().setLedgerId(1234)
                                                         
.setEntryId(567).build(), consumer.cnx());
             Message<?> m = consumer.receive();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index da1cbb5d17..d94175846a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -338,4 +338,28 @@
      * @return consumer builder.
      */
     ConsumerBuilder<T> intercept(ConsumerInterceptor<T> ...interceptors);
+
+    /**
+     * Set dead letter policy for consumer
+     *
+     * By default some message will redelivery so many times possible, even to 
the extent that it can be never stop.
+     * By using dead letter mechanism messages will has the max redelivery 
count, when message exceeding the maximum
+     * number of redeliveries, message will send to the Dead Letter Topic and 
acknowledged automatic.
+     *
+     * You can enable the dead letter mechanism by setting dead letter policy.
+     * example:
+     * <pre>
+     * client.newConsumer()
+     *          
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())
+     *          .subscribe();
+     * </pre>
+     * Default dead letter topic name is {TopicName}-{Subscription}-DLQ.
+     * To setting a custom dead letter topic name
+     * <pre>
+     * client.newConsumer()
+     *          
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("your-topic-name").build())
+     *          .subscribe();
+     * </pre>
+     */
+    ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
new file mode 100644
index 0000000000..52a2a23b7f
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Builder
+@Data
+public class DeadLetterPolicy {
+
+    private int maxRedeliverCount;
+
+    private String deadLetterTopic;
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 7287194f62..9306a8248b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -284,7 +284,7 @@ protected void handleMessage(CommandMessage cmdMessage, 
ByteBuf headersAndPayloa
         }
         ConsumerImpl<?> consumer = consumers.get(cmdMessage.getConsumerId());
         if (consumer != null) {
-            consumer.messageReceived(cmdMessage.getMessageId(), 
headersAndPayload, this);
+            consumer.messageReceived(cmdMessage.getMessageId(), 
cmdMessage.getRedeliveryCount(), headersAndPayload, this);
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 2095babbb1..103bb5e847 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -44,6 +44,7 @@
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.common.util.FutureUtil;
 
 import com.google.common.collect.Lists;
@@ -256,6 +257,12 @@
         return this;
     }
 
+    @Override
+    public ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy 
deadLetterPolicy) {
+        conf.setDeadLetterPolicy(deadLetterPolicy);
+        return this;
+    }
+
     public ConsumerConfigurationData<T> getConf() {
            return conf;
        }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a0a231902f..b7f9918d92 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -27,14 +27,12 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Timeout;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import static java.util.Base64.getEncoder;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -43,6 +41,7 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -54,11 +53,13 @@
 import java.util.stream.Collectors;
 
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -81,6 +82,7 @@
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,6 +135,12 @@
 
     private final String topicNameWithoutPartition;
 
+    private ConcurrentHashMap<MessageIdImpl, List<MessageImpl<T>>> 
possibleSendToDeadLetterTopicMessages;
+
+    private DeadLetterPolicy deadLetterPolicy;
+
+    private Producer<T> deadLetterProducer;
+
     enum SubscriptionMode {
         // Make the subscription to be backed by a durable cursor that will 
retain messages and persist the current
         // position
@@ -205,6 +213,21 @@
                 NonPersistentAcknowledgmentGroupingTracker.of();
         }
 
+        if (conf.getDeadLetterPolicy() != null) {
+            possibleSendToDeadLetterTopicMessages = new ConcurrentHashMap<>();
+            if 
(StringUtils.isNotBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
+                this.deadLetterPolicy = DeadLetterPolicy.builder()
+                        
.maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
+                        
.deadLetterTopic(conf.getDeadLetterPolicy().getDeadLetterTopic())
+                        .build();
+            } else {
+                this.deadLetterPolicy = DeadLetterPolicy.builder()
+                        
.maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
+                        .deadLetterTopic(String.format("%s-%s-DLQ", topic, 
subscription))
+                        .build();
+            }
+        }
+
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
         grabCnx();
@@ -233,6 +256,9 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
             cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
                 cnx.removeConsumer(consumerId);
                 unAckedMessageTracker.close();
+                if (possibleSendToDeadLetterTopicMessages != null) {
+                    possibleSendToDeadLetterTopicMessages.clear();
+                }
                 client.cleanupConsumer(ConsumerImpl.this);
                 log.info("[{}][{}] Successfully unsubscribed from topic", 
topic, subscription);
                 unsubscribeFuture.complete(null);
@@ -448,9 +474,16 @@ boolean markAckForBatchMessage(BatchMessageIdImpl 
batchMessageId, AckType ackTyp
                 stats.incrementNumAcksSent(batchMessageId.getBatchSize());
                 unAckedMessageTracker.remove(new 
MessageIdImpl(batchMessageId.getLedgerId(),
                         batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex()));
+                if (possibleSendToDeadLetterTopicMessages != null) {
+                    possibleSendToDeadLetterTopicMessages.remove(new 
MessageIdImpl(batchMessageId.getLedgerId(),
+                            batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex()));
+                }
             } else {
                 // increment counter by 1 for non-batch msg
                 unAckedMessageTracker.remove(msgId);
+                if (possibleSendToDeadLetterTopicMessages != null) {
+                    possibleSendToDeadLetterTopicMessages.remove(msgId);
+                }
                 stats.incrementNumAcksSent(1);
             }
             onAcknowledge(messageId, null);
@@ -479,7 +512,9 @@ public void connectionOpened(final ClientCnx cnx) {
         synchronized (this) {
             currentSize = incomingMessages.size();
             startMessageId = clearReceiverQueue();
-            unAckedMessageTracker.clear();
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.clear();
+            }
         }
 
         boolean isDurable = subscriptionMode == SubscriptionMode.Durable;
@@ -623,6 +658,9 @@ public void connectionFailed(PulsarClientException 
exception) {
     public CompletableFuture<Void> closeAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
             unAckedMessageTracker.close();
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.clear();
+            }
             return CompletableFuture.completedFuture(null);
         }
 
@@ -630,6 +668,9 @@ public void connectionFailed(PulsarClientException 
exception) {
             log.info("[{}] [{}] Closed Consumer (not connected)", topic, 
subscription);
             setState(State.Closed);
             unAckedMessageTracker.close();
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.clear();
+            }
             client.cleanupConsumer(this);
             return CompletableFuture.completedFuture(null);
         }
@@ -651,6 +692,9 @@ public void connectionFailed(PulsarClientException 
exception) {
                 log.info("[{}] [{}] Closed consumer", topic, subscription);
                 setState(State.Closed);
                 unAckedMessageTracker.close();
+                if (possibleSendToDeadLetterTopicMessages != null) {
+                    possibleSendToDeadLetterTopicMessages.clear();
+                }
                 closeFuture.complete(null);
                 client.cleanupConsumer(this);
                 // fail all pending-receive futures to notify application
@@ -697,7 +741,7 @@ void activeConsumerChanged(boolean isActive) {
         });
     }
 
-    void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, 
ClientCnx cnx) {
+    void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf 
headersAndPayload, ClientCnx cnx) {
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Received message: {}/{}", topic, subscription, 
messageId.getLedgerId(),
                     messageId.getEntryId());
@@ -756,6 +800,7 @@ void messageReceived(MessageIdData messageId, ByteBuf 
headersAndPayload, ClientC
         if (isMessageUndecryptable || (numMessages == 1 && 
!msgMetadata.hasNumMessagesInBatch())) {
             final MessageImpl<T> message = new MessageImpl<>(msgId, 
msgMetadata, uncompressedPayload,
                     createEncryptionContext(msgMetadata), cnx, schema);
+
             uncompressedPayload.release();
             msgMetadata.recycle();
 
@@ -765,6 +810,9 @@ void messageReceived(MessageIdData messageId, ByteBuf 
headersAndPayload, ClientC
                 // if the conf.getReceiverQueueSize() is 0 then discard 
message if no one is waiting for it.
                 // if asyncReceive is waiting then notify callback without 
adding to incomingMessages queue
                 unAckedMessageTracker.add((MessageIdImpl) 
message.getMessageId());
+                if (deadLetterPolicy != null && 
possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= 
deadLetterPolicy.getMaxRedeliverCount()) {
+                    
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
 Collections.singletonList(message));
+                }
                 if (!pendingReceives.isEmpty()) {
                     notifyPendingReceivedCallback(message, null);
                 } else if (conf.getReceiverQueueSize() != 0 || 
waitingOnReceiveForZeroQueueSize) {
@@ -791,7 +839,7 @@ void messageReceived(MessageIdData messageId, ByteBuf 
headersAndPayload, ClientC
                 });
             } else {
                 // handle batch message enqueuing; uncompressed payload has 
all messages in batch
-                receiveIndividualMessagesFromBatch(msgMetadata, 
uncompressedPayload, messageId, cnx);
+                receiveIndividualMessagesFromBatch(msgMetadata, 
redeliveryCount, uncompressedPayload, messageId, cnx);
             }
             uncompressedPayload.release();
             msgMetadata.recycle();
@@ -881,7 +929,7 @@ private void triggerZeroQueueSizeListener(final Message<T> 
message) {
         });
     }
 
-    void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, 
ByteBuf uncompressedPayload,
+    void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int 
redeliveryCount, ByteBuf uncompressedPayload,
             MessageIdData messageId, ClientCnx cnx) {
         int batchSize = msgMetadata.getNumMessagesInBatch();
 
@@ -890,7 +938,10 @@ void receiveIndividualMessagesFromBatch(MessageMetadata 
msgMetadata, ByteBuf unc
                 getPartitionIndex());
         BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
         unAckedMessageTracker.add(batchMessage);
-
+        List<MessageImpl<T>> possibleToDeadLetter = null;
+        if (deadLetterPolicy != null && redeliveryCount >= 
deadLetterPolicy.getMaxRedeliverCount()) {
+            possibleToDeadLetter = new ArrayList<>();
+        }
         int skippedMessages = 0;
         try {
             for (int i = 0; i < batchSize; ++i) {
@@ -930,6 +981,9 @@ void receiveIndividualMessagesFromBatch(MessageMetadata 
msgMetadata, ByteBuf unc
                 final MessageImpl<T> message = new 
MessageImpl<>(batchMessageIdImpl, msgMetadata,
                         singleMessageMetadataBuilder.build(), 
singleMessagePayload,
                         createEncryptionContext(msgMetadata), cnx, schema);
+                if (possibleToDeadLetter != null) {
+                    possibleToDeadLetter.add(message);
+                }
                 lock.readLock().lock();
                 try {
                     if (pendingReceives.isEmpty()) {
@@ -947,6 +1001,10 @@ void receiveIndividualMessagesFromBatch(MessageMetadata 
msgMetadata, ByteBuf unc
             log.warn("[{}] [{}] unable to obtain message in batch", 
subscription, consumerName);
             discardCorruptedMessage(messageId, cnx, 
ValidationError.BatchDeSerializeError);
         }
+        if (possibleToDeadLetter != null && 
possibleSendToDeadLetterTopicMessages != null) {
+            possibleSendToDeadLetterTopicMessages.put(batchMessage, 
possibleToDeadLetter);
+        }
+
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] enqueued messages in batch. queue size - {}, 
available queue size - {}", subscription,
                     consumerName, incomingMessages.size(), 
incomingMessages.remainingCapacity());
@@ -1184,6 +1242,8 @@ public void 
redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
             MessageIdData.Builder builder = MessageIdData.newBuilder();
             batches.forEach(ids -> {
                 List<MessageIdData> messageIdDatas = 
ids.stream().map(messageId -> {
+                    // process message possible to dead letter topic
+                    processPossibleToDLQ(messageId);
                     // attempt to remove message from batchMessageAckTracker
                     builder.setPartition(messageId.getPartitionIndex());
                     builder.setLedgerId(messageId.getLedgerId());
@@ -1212,6 +1272,43 @@ public void 
redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
         }
     }
 
+    private void processPossibleToDLQ(MessageIdImpl messageId) {
+        List<MessageImpl<T>> deadLetterMessages = null;
+        if (possibleSendToDeadLetterTopicMessages != null) {
+            if (messageId instanceof BatchMessageIdImpl) {
+                deadLetterMessages = 
possibleSendToDeadLetterTopicMessages.get(new 
MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
+                        getPartitionIndex()));
+            } else {
+                deadLetterMessages = 
possibleSendToDeadLetterTopicMessages.get(messageId);
+            }
+        }
+        if (deadLetterMessages != null) {
+            if (deadLetterProducer == null) {
+                try {
+                    deadLetterProducer = client.newProducer(schema)
+                            .topic(this.deadLetterPolicy.getDeadLetterTopic())
+                            .blockIfQueueFull(false)
+                            .create();
+                } catch (Exception e) {
+                    log.error("Create dead letter producer exception with 
topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
+                }
+            }
+            if (deadLetterProducer != null) {
+                try {
+                    for (MessageImpl<T> message : deadLetterMessages) {
+                        deadLetterProducer.newMessage()
+                                .value(message.getValue())
+                                .properties(message.getProperties())
+                                .send();
+                    }
+                    acknowledge(messageId);
+                } catch (Exception e) {
+                    log.error("Send to dead letter topic exception with topic: 
{}, messageId: {}", deadLetterProducer.getTopic(), messageId, e);
+                }
+            }
+        }
+    }
+
     @Override
     public void seek(MessageId messageId) throws PulsarClientException {
         try {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 47ede41de6..a0fd493494 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -38,6 +38,7 @@
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 
 @Data
 public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
@@ -82,6 +83,8 @@
 
     private int patternAutoDiscoveryPeriod = 1;
 
+    private DeadLetterPolicy deadLetterPolicy;
+
     @JsonIgnore
     public String getSingleTopic() {
         checkArgument(topicNames.size() == 1);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index b5e2406c9e..c94482d022 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -282,10 +282,13 @@ public static MessageMetadata 
parseMessageMetadata(ByteBuf buffer) {
         }
     }
 
-    public static ByteBufPair newMessage(long consumerId, MessageIdData 
messageId, ByteBuf metadataAndPayload) {
+    public static ByteBufPair newMessage(long consumerId, MessageIdData 
messageId, int redeliveryCount, ByteBuf metadataAndPayload) {
         CommandMessage.Builder msgBuilder = CommandMessage.newBuilder();
         msgBuilder.setConsumerId(consumerId);
         msgBuilder.setMessageId(messageId);
+        if (redeliveryCount > 0) {
+            msgBuilder.setRedeliveryCount(redeliveryCount);
+        }
         CommandMessage msg = msgBuilder.build();
         BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder();
         BaseCommand cmd = 
cmdBuilder.setType(Type.MESSAGE).setMessage(msg).build();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index edc7e9b118..c6ff6c1668 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -13542,6 +13542,10 @@ void 
setMessage(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString val
     // required .pulsar.proto.MessageIdData message_id = 2;
     boolean hasMessageId();
     org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getMessageId();
+    
+    // optional uint32 redelivery_count = 3 [default = 0];
+    boolean hasRedeliveryCount();
+    int getRedeliveryCount();
   }
   public static final class CommandMessage extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -13598,9 +13602,20 @@ public boolean hasMessageId() {
       return messageId_;
     }
     
+    // optional uint32 redelivery_count = 3 [default = 0];
+    public static final int REDELIVERY_COUNT_FIELD_NUMBER = 3;
+    private int redeliveryCount_;
+    public boolean hasRedeliveryCount() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public int getRedeliveryCount() {
+      return redeliveryCount_;
+    }
+    
     private void initFields() {
       consumerId_ = 0L;
       messageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+      redeliveryCount_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -13637,6 +13652,9 @@ public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeMessage(2, messageId_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeUInt32(3, redeliveryCount_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -13653,6 +13671,10 @@ public int getSerializedSize() {
         size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeMessageSize(2, messageId_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt32Size(3, redeliveryCount_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -13770,6 +13792,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000001);
         messageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
         bitField0_ = (bitField0_ & ~0x00000002);
+        redeliveryCount_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
       
@@ -13811,6 +13835,10 @@ public Builder clone() {
           to_bitField0_ |= 0x00000002;
         }
         result.messageId_ = messageId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.redeliveryCount_ = redeliveryCount_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -13823,6 +13851,9 @@ public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandMes
         if (other.hasMessageId()) {
           mergeMessageId(other.getMessageId());
         }
+        if (other.hasRedeliveryCount()) {
+          setRedeliveryCount(other.getRedeliveryCount());
+        }
         return this;
       }
       
@@ -13879,6 +13910,11 @@ public Builder mergeFrom(
               subBuilder.recycle();
               break;
             }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              redeliveryCount_ = input.readUInt32();
+              break;
+            }
           }
         }
       }
@@ -13949,6 +13985,27 @@ public Builder clearMessageId() {
         return this;
       }
       
+      // optional uint32 redelivery_count = 3 [default = 0];
+      private int redeliveryCount_ ;
+      public boolean hasRedeliveryCount() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public int getRedeliveryCount() {
+        return redeliveryCount_;
+      }
+      public Builder setRedeliveryCount(int value) {
+        bitField0_ |= 0x00000004;
+        redeliveryCount_ = value;
+        
+        return this;
+      }
+      public Builder clearRedeliveryCount() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        redeliveryCount_ = 0;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandMessage)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 779a27bc1f..c50f0728e4 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -335,6 +335,7 @@ message CommandSendError {
 message CommandMessage {
        required uint64 consumer_id       = 1;
        required MessageIdData message_id = 2;
+       optional uint32 redelivery_count  = 3 [default = 0];
 }
 
 message CommandAck {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to