This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a1e1388111f5dafa6aa4f0fe0858033af76924cf
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Mar 9 22:53:48 2021 -0800

    [pulsar-broker] Allow broker to discover and unblock stuck subscription 
(#9789)
    
    We have been frequently seeing issue where subscription gets stuck on 
different topics and broker is not dispatching messages though consumer has 
available-permits and no pending reads (example #9788). It can happen due to 
regression bug or unknown issue when expiry runs.. one of the workarounds is 
manually unload the topic and reload it which is not feasible if this happens 
frequently to many topics. Or broker should have the capability to discover 
such stuck subscriptions and unblock them.
    Below example shows that:
    subscription has available-permit>0, there is no pending reads, cursor's 
read-position is not moving forward and that builds the backlog until we unload 
the topic. It happens frequently due to unknown reason:
    ```
    STATS-INTERNAL:
    "sub1" : {
          "markDeletePosition" : "11111111:15520",
          "readPosition" : "11111111:15521",
          "waitingReadOp" : false,
          "pendingReadOps" : 0,
          "messagesConsumedCounter" : 115521,
          "cursorLedger" : 585099247,
          "cursorLedgerLastEntry" : 597,
          "individuallyDeletedMessages" : "[]",
          "lastLedgerSwitchTimestamp" : "2021-02-25T19:55:50.357Z",
          "state" : "Open",
          "numberOfEntriesSinceFirstNotAckedMessage" : 1,
          "totalNonContiguousDeletedMessagesRange" : 0,
    
    STATS:
    "sub1" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 30350,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Shared",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "C1",
            "availablePermits" : 723,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2021-02-25T19:55:50.358285Z",
    
    ```
    
    
![image](https://user-images.githubusercontent.com/2898254/109894631-ab62d980-7c42-11eb-8dcc-a1a5f4f5d14e.png)
    
    Add capability in broker to periodically check if subscription is stuck and 
unblock it if needed. This check is controlled by flag and for initial release 
it can be disabled by default (and we can enable by default in later release)
    
    It helps broker to handle stuck subscription and logs the message for later 
debugging.
---
 conf/broker.conf                                   |  3 +
 deployment/terraform-ansible/templates/broker.conf |  3 +
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  6 ++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 12 ++++
 .../bookkeeper/mledger/impl/PositionImpl.java      |  1 -
 .../mledger/impl/ManagedCursorContainerTest.java   |  5 ++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 +++++++++++++++-
 .../apache/pulsar/broker/ServiceConfiguration.java |  7 ++
 .../apache/pulsar/broker/service/Dispatcher.java   |  7 ++
 .../PersistentDispatcherMultipleConsumers.java     | 15 ++++
 .../PersistentDispatcherSingleActiveConsumer.java  | 15 ++++
 .../service/persistent/PersistentSubscription.java |  4 ++
 .../broker/service/persistent/PersistentTopic.java |  4 ++
 .../service/persistent/PersistentTopicTest.java    | 83 +++++++++++++++++++++-
 site2/docs/reference-configuration.md              |  1 +
 15 files changed, 224 insertions(+), 3 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index f286961..92a2c1d 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -259,6 +259,9 @@ maxUnackedMessagesPerBroker=0
 # limit/2 messages
 maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
 
+# Broker periodically checks if subscription is stuck and unblock if flag is 
enabled. (Default is disabled)
+unblockStuckSubscriptionEnabled=false
+
 # Tick time to schedule task that checks topic publish rate limiting across 
all topics
 # Reducing to lower value can give more accuracy while throttling publish but
 # it uses more CPU to perform frequent check. (Disable publish throttling with 
value 0)
diff --git a/deployment/terraform-ansible/templates/broker.conf 
b/deployment/terraform-ansible/templates/broker.conf
index 638c889..91dc813 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -227,6 +227,9 @@ maxUnackedMessagesPerBroker=0
 # limit/2 messages
 maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
 
+# Broker periodically checks if subscription is stuck and unblock if flag is 
enabled. (Default is disabled)
+unblockStuckSubscriptionEnabled=false
+
 # Tick time to schedule task that checks topic publish rate limiting across 
all topics
 # Reducing to lower value can give more accuracy while throttling publish but
 # it uses more CPU to perform frequent check. (Disable publish throttling with 
value 0)
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index a9724de..320a030 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -673,4 +673,10 @@ public interface ManagedCursor {
      */
     ManagedCursorMXBean getStats();
 
+    /**
+     * Checks if read position changed since this method was called last time.
+     *
+     * @return if read position changed
+     */
+    boolean checkAndUpdateReadPositionChanged();
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index f732a75..f114f6d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -119,6 +119,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, 
PositionImpl> READ_POSITION_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, 
PositionImpl.class, "readPosition");
     protected volatile PositionImpl readPosition;
+    // keeps sample of last read-position for validation and monitoring if 
read-position is not moving forward.
+    protected volatile PositionImpl statsLastReadPosition;
 
     protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, 
MarkDeleteEntry> LAST_MARK_DELETE_ENTRY_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, 
MarkDeleteEntry.class, "lastMarkDeleteEntry");
@@ -2969,5 +2971,15 @@ public class ManagedCursorImpl implements ManagedCursor {
         return Math.min(maxEntriesBasedOnSize, maxEntries);
     }
 
+    @Override
+    public boolean checkAndUpdateReadPositionChanged() {
+        PositionImpl lastEntry = ledger.lastConfirmedEntry;
+        boolean isReadPositionOnTail = lastEntry == null || readPosition == 
null
+                || !(lastEntry.compareTo(readPosition) > 0);
+        boolean isReadPositionChanged = readPosition != null && 
!readPosition.equals(statsLastReadPosition);
+        statsLastReadPosition = readPosition;
+        return isReadPositionOnTail || isReadPositionChanged;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorImpl.class);
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
index ef445f9..e4a3060 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
@@ -125,7 +125,6 @@ public class PositionImpl implements Position, 
Comparable<PositionImpl> {
             PositionImpl other = (PositionImpl) obj;
             return ledgerId == other.ledgerId && entryId == other.entryId;
         }
-
         return false;
     }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index d0b0b2c..4801aab 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -357,6 +357,11 @@ public class ManagedCursorContainerTest {
                 throws InterruptedException, ManagedLedgerException {
             return null;
         }
+
+        @Override
+        public boolean checkAndUpdateReadPositionChanged() {
+            return false;
+        }
     }
 
     @Test
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 4f6e1a0..f34cc8c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3414,12 +3414,71 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 
                     } finally {
                         factory2.shutdown();
-                    }
+                    }   
                 });
 
         factory1.shutdown();
         dirtyFactory.shutdown();
     }
 
+    @Test
+    public void testCursorCheckReadPositionChanged() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger", new 
ManagedLedgerConfig());
+        ManagedCursor c1 = ledger.openCursor("c1");
+
+        // check empty ledger
+        assertTrue(c1.checkAndUpdateReadPositionChanged());
+        assertTrue(c1.checkAndUpdateReadPositionChanged());
+
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+
+        // read-position has not been moved
+        assertFalse(c1.checkAndUpdateReadPositionChanged());
+
+        List<Entry> entries = c1.readEntries(2);
+        entries.forEach(e -> {
+            try {
+                c1.markDelete(e.getPosition());
+                e.release();
+            } catch (Exception e1) {
+                // Ok
+            }
+        });
+
+        // read-position is moved
+        assertTrue(c1.checkAndUpdateReadPositionChanged());
+        // read-position has not been moved since last read
+        assertFalse(c1.checkAndUpdateReadPositionChanged());
+
+        c1.close();
+        ledger.close();
+
+        ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
+        // recover cursor
+        ManagedCursor c2 = ledger.openCursor("c1");
+        assertTrue(c2.checkAndUpdateReadPositionChanged());
+        assertFalse(c2.checkAndUpdateReadPositionChanged());
+
+        entries = c2.readEntries(2);
+        entries.forEach(e -> {
+            try {
+                c2.markDelete(e.getPosition());
+                e.release();
+            } catch (Exception e1) {
+                // Ok
+            }
+        });
+
+        assertTrue(c2.checkAndUpdateReadPositionChanged());
+        // returns true because read-position is on tail
+        assertTrue(c2.checkAndUpdateReadPositionChanged());
+        assertTrue(c2.checkAndUpdateReadPositionChanged());
+
+        ledger.close();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5979e63..2614706 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -509,6 +509,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(
             category = CATEGORY_POLICIES,
             dynamic = true,
+            doc = "Broker periodically checks if subscription is stuck and 
unblock if flag is enabled. "
+                    + "(Default is disabled)"
+        )
+    private boolean unblockStuckSubscriptionEnabled = false;
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            dynamic = true,
             doc = "Tick time to schedule task that checks topic publish rate 
limiting across all topics  "
                     + "Reducing to lower value can give more accuracy while 
throttling publish but "
                     + "it uses more CPU to perform frequent check. (Disable 
publish throttling with value 0)"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 43b02d1..e30d54c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -125,4 +125,11 @@ public interface Dispatcher {
         // No-op
     }
 
+    /**
+     * Checks if dispatcher is stuck and unblocks the dispatch if needed.
+     */
+    default boolean checkAndUnblockIfStuck() {
+        return false;
+    }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 934873a..f3c6d94 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -865,6 +865,21 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         this.messagesToRedeliver.add(ledgerId, entryId);
     }
 
+    @Override
+    public boolean checkAndUnblockIfStuck() {
+        if (cursor.checkAndUpdateReadPositionChanged()) {
+            return false;
+        }
+        // consider dispatch is stuck if : dispatcher has backlog, 
available-permits and there is no pending read
+        if (totalAvailablePermits > 0 && !havePendingReplayRead && 
!havePendingRead
+                && cursor.getNumberOfEntriesInBacklog(false) > 0) {
+            log.warn("{}-{} Dispatcher is stuck and unblocking by issuing 
reads", topic.getName(), name);
+            readMoreEntries();
+            return true;
+        }
+        return false;
+    }
+
     public PersistentTopic getTopic() {
         return topic;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 48c82d1..468f413 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -615,5 +615,20 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         this.messagesToRedeliver.add(ledgerId, entryId);
     }
 
+    public boolean checkAndUnblockIfStuck() {
+        if (cursor.checkAndUpdateReadPositionChanged()) {
+            return false;
+        }
+        Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
+        int totalAvailablePermits = consumer.getAvailablePermits();
+        // consider dispatch is stuck if : dispatcher has backlog, 
available-permits and there is no pending read
+        if (totalAvailablePermits > 0 && !havePendingRead && 
cursor.getNumberOfEntriesInBacklog(false) > 0) {
+            log.warn("{}-{} Dispatcher is stuck and unblocking by issuing 
reads", topic.getName(), name);
+            readMoreEntries(consumer);
+            return true;
+        }
+        return false;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index b01e5cb..57281cb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1089,5 +1089,9 @@ public class PersistentSubscription implements 
Subscription {
         return 
this.pendingAckHandle.checkIsCanDeleteConsumerPendingAck(position);
     }
 
+    public boolean checkAndUnblockIfStuck() {
+        return dispatcher.checkAndUnblockIfStuck();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentSubscription.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d87707a..3ba512a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1534,6 +1534,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 topicStatsHelper.aggMsgRateOut += subMsgRateOut;
                 topicStatsHelper.aggMsgThroughputOut += subMsgThroughputOut;
                 nsStats.msgBacklog += 
subscription.getNumberOfEntriesInBacklog(false);
+                // check stuck subscription
+                if 
(brokerService.getPulsar().getConfig().isUnblockStuckSubscriptionEnabled()) {
+                    subscription.checkAndUnblockIfStuck();
+                }
             } catch (Exception e) {
                 log.error("Got exception when creating consumer stats for 
subscription {}: {}", subscriptionName,
                         e.getMessage(), e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index f4d11b8..547baef 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -21,14 +21,32 @@ package org.apache.pulsar.broker.service.persistent;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.awaitility.Awaitility;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.TopicName;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -72,4 +90,67 @@ public class PersistentTopicTest extends BrokerTestBase {
         persistentTopic.onPoliciesUpdate(policies);
         verify(persistentTopic, times(1)).checkReplicationAndRetryOnFailure();
     }
-}
\ No newline at end of file
+
+    /**
+     * Test validates if topic's dispatcher is stuck then broker can doscover 
and unblock it.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testUnblockStuckSubscription() throws Exception {
+        final String topicName = 
"persistent://prop/ns-abc/stuckSubscriptionTopic";
+        final String sharedSubName = "shared";
+        final String failoverSubName = "failOver";
+
+        Consumer<String> consumer1 = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                
.subscriptionType(SubscriptionType.Shared).subscriptionName(sharedSubName).subscribe();
+        Consumer<String> consumer2 = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                
.subscriptionType(SubscriptionType.Failover).subscriptionName(failoverSubName).subscribe();
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentSubscription sharedSub = 
topic.getSubscription(sharedSubName);
+        PersistentSubscription failOverSub = 
topic.getSubscription(failoverSubName);
+
+        PersistentDispatcherMultipleConsumers sharedDispatcher = 
(PersistentDispatcherMultipleConsumers) sharedSub
+                .getDispatcher();
+        PersistentDispatcherSingleActiveConsumer failOverDispatcher = 
(PersistentDispatcherSingleActiveConsumer) failOverSub
+                .getDispatcher();
+
+        // build backlog
+        consumer1.close();
+        consumer2.close();
+
+        // block sub to read messages
+        sharedDispatcher.havePendingRead = true;
+        failOverDispatcher.havePendingRead = true;
+
+        producer.newMessage().value("test").eventTime(5).send();
+        producer.newMessage().value("test").eventTime(5).send();
+
+        consumer1 = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(sharedSubName).subscribe();
+        consumer2 = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionType(SubscriptionType.Failover)
+                .subscriptionName(failoverSubName).subscribe();
+        Message<String> msg = consumer1.receive(2, TimeUnit.SECONDS);
+        assertNull(msg);
+        msg = consumer2.receive(2, TimeUnit.SECONDS);
+        assertNull(msg);
+
+        // allow reads but dispatchers are still blocked
+        sharedDispatcher.havePendingRead = false;
+        failOverDispatcher.havePendingRead = false;
+
+        // run task to unblock stuck dispatcher: first iteration sets the 
lastReadPosition and next iteration will
+        // unblock the dispatcher read because read-position has not been 
moved since last iteration.
+        sharedSub.checkAndUnblockIfStuck();
+        failOverDispatcher.checkAndUnblockIfStuck();
+        assertTrue(sharedSub.checkAndUnblockIfStuck());
+        assertTrue(failOverDispatcher.checkAndUnblockIfStuck());
+
+        msg = consumer1.receive(5, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        msg = consumer2.receive(5, TimeUnit.SECONDS);
+        assertNotNull(msg);
+    }
+}
diff --git a/site2/docs/reference-configuration.md 
b/site2/docs/reference-configuration.md
index 28cf79e..eb4076c 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -480,6 +480,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) 
CLI tool can be used
 |maxUnackedMessagesPerSubscription| The same as above, except per subscription 
rather than per consumer.  |200000|
 | maxUnackedMessagesPerBroker | Maximum number of unacknowledged messages 
allowed per broker. Once this limit reaches, the broker stops dispatching 
messages to all shared subscriptions which has a higher number of 
unacknowledged messages until subscriptions start acknowledging messages back 
and unacknowledged messages count reaches to limit/2. When the value is set to 
0, unacknowledged message limit check is disabled and broker does not block 
dispatchers. | 0 |
 | maxUnackedMessagesPerSubscriptionOnBrokerBlocked | Once the broker reaches 
maxUnackedMessagesPerBroker limit, it blocks subscriptions which have higher 
unacknowledged messages than this percentage limit and subscription does not 
receive any new messages until that subscription acknowledges messages back. | 
0.16 |
+| unblockStuckSubscriptionEnabled|Broker periodically checks if subscription 
is stuck and unblock if flag is enabled.|false|
 |maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned 
topic. Use 0 or negative number to disable the check|0|
 |zookeeperSessionExpiredPolicy|There are two policies when ZooKeeper session 
expired happens, "shutdown" and "reconnect". If it is set to "shutdown" policy, 
when ZooKeeper session expired happens, the broker is shutdown. If it is set to 
"reconnect" policy, the broker tries to reconnect to ZooKeeper server and 
re-register metadata to ZooKeeper. Note: the "reconnect" policy is an 
experiment feature.|shutdown|
 | topicPublisherThrottlingTickTimeMillis | Tick time to schedule task that 
checks topic publish rate limiting across all topics. A lower value can improve 
accuracy while throttling publish but it uses more CPU to perform frequent 
check. (Disable publish throttling with value 0) | 10|

Reply via email to