This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch 2.7.2_ds_tmp in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a1e1388111f5dafa6aa4f0fe0858033af76924cf Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Tue Mar 9 22:53:48 2021 -0800 [pulsar-broker] Allow broker to discover and unblock stuck subscription (#9789) We have been frequently seeing issue where subscription gets stuck on different topics and broker is not dispatching messages though consumer has available-permits and no pending reads (example #9788). It can happen due to regression bug or unknown issue when expiry runs.. one of the workarounds is manually unload the topic and reload it which is not feasible if this happens frequently to many topics. Or broker should have the capability to discover such stuck subscriptions and unblock them. Below example shows that: subscription has available-permit>0, there is no pending reads, cursor's read-position is not moving forward and that builds the backlog until we unload the topic. It happens frequently due to unknown reason: ``` STATS-INTERNAL: "sub1" : { "markDeletePosition" : "11111111:15520", "readPosition" : "11111111:15521", "waitingReadOp" : false, "pendingReadOps" : 0, "messagesConsumedCounter" : 115521, "cursorLedger" : 585099247, "cursorLedgerLastEntry" : 597, "individuallyDeletedMessages" : "[]", "lastLedgerSwitchTimestamp" : "2021-02-25T19:55:50.357Z", "state" : "Open", "numberOfEntriesSinceFirstNotAckedMessage" : 1, "totalNonContiguousDeletedMessagesRange" : 0, STATS: "sub1" : { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "msgBacklog" : 30350, "blockedSubscriptionOnUnackedMsgs" : false, "msgDelayed" : 0, "unackedMessages" : 0, "type" : "Shared", "msgRateExpired" : 0.0, "consumers" : [ { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "C1", "availablePermits" : 723, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "connectedSince" : "2021-02-25T19:55:50.358285Z", ``` ![image](https://user-images.githubusercontent.com/2898254/109894631-ab62d980-7c42-11eb-8dcc-a1a5f4f5d14e.png) Add capability in broker to periodically check if subscription is stuck and unblock it if needed. This check is controlled by flag and for initial release it can be disabled by default (and we can enable by default in later release) It helps broker to handle stuck subscription and logs the message for later debugging. --- conf/broker.conf | 3 + deployment/terraform-ansible/templates/broker.conf | 3 + .../apache/bookkeeper/mledger/ManagedCursor.java | 6 ++ .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 12 ++++ .../bookkeeper/mledger/impl/PositionImpl.java | 1 - .../mledger/impl/ManagedCursorContainerTest.java | 5 ++ .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 +++++++++++++++- .../apache/pulsar/broker/ServiceConfiguration.java | 7 ++ .../apache/pulsar/broker/service/Dispatcher.java | 7 ++ .../PersistentDispatcherMultipleConsumers.java | 15 ++++ .../PersistentDispatcherSingleActiveConsumer.java | 15 ++++ .../service/persistent/PersistentSubscription.java | 4 ++ .../broker/service/persistent/PersistentTopic.java | 4 ++ .../service/persistent/PersistentTopicTest.java | 83 +++++++++++++++++++++- site2/docs/reference-configuration.md | 1 + 15 files changed, 224 insertions(+), 3 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index f286961..92a2c1d 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -259,6 +259,9 @@ maxUnackedMessagesPerBroker=0 # limit/2 messages maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 +# Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled) +unblockStuckSubscriptionEnabled=false + # Tick time to schedule task that checks topic publish rate limiting across all topics # Reducing to lower value can give more accuracy while throttling publish but # it uses more CPU to perform frequent check. (Disable publish throttling with value 0) diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index 638c889..91dc813 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -227,6 +227,9 @@ maxUnackedMessagesPerBroker=0 # limit/2 messages maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 +# Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled) +unblockStuckSubscriptionEnabled=false + # Tick time to schedule task that checks topic publish rate limiting across all topics # Reducing to lower value can give more accuracy while throttling publish but # it uses more CPU to perform frequent check. (Disable publish throttling with value 0) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index a9724de..320a030 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -673,4 +673,10 @@ public interface ManagedCursor { */ ManagedCursorMXBean getStats(); + /** + * Checks if read position changed since this method was called last time. + * + * @return if read position changed + */ + boolean checkAndUpdateReadPositionChanged(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index f732a75..f114f6d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -119,6 +119,8 @@ public class ManagedCursorImpl implements ManagedCursor { protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, PositionImpl> READ_POSITION_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class, "readPosition"); protected volatile PositionImpl readPosition; + // keeps sample of last read-position for validation and monitoring if read-position is not moving forward. + protected volatile PositionImpl statsLastReadPosition; protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, MarkDeleteEntry> LAST_MARK_DELETE_ENTRY_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, MarkDeleteEntry.class, "lastMarkDeleteEntry"); @@ -2969,5 +2971,15 @@ public class ManagedCursorImpl implements ManagedCursor { return Math.min(maxEntriesBasedOnSize, maxEntries); } + @Override + public boolean checkAndUpdateReadPositionChanged() { + PositionImpl lastEntry = ledger.lastConfirmedEntry; + boolean isReadPositionOnTail = lastEntry == null || readPosition == null + || !(lastEntry.compareTo(readPosition) > 0); + boolean isReadPositionChanged = readPosition != null && !readPosition.equals(statsLastReadPosition); + statsLastReadPosition = readPosition; + return isReadPositionOnTail || isReadPositionChanged; + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java index ef445f9..e4a3060 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java @@ -125,7 +125,6 @@ public class PositionImpl implements Position, Comparable<PositionImpl> { PositionImpl other = (PositionImpl) obj; return ledgerId == other.ledgerId && entryId == other.entryId; } - return false; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index d0b0b2c..4801aab 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -357,6 +357,11 @@ public class ManagedCursorContainerTest { throws InterruptedException, ManagedLedgerException { return null; } + + @Override + public boolean checkAndUpdateReadPositionChanged() { + return false; + } } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 4f6e1a0..f34cc8c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -3414,12 +3414,71 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { } finally { factory2.shutdown(); - } + } }); factory1.shutdown(); dirtyFactory.shutdown(); } + @Test + public void testCursorCheckReadPositionChanged() throws Exception { + ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()); + ManagedCursor c1 = ledger.openCursor("c1"); + + // check empty ledger + assertTrue(c1.checkAndUpdateReadPositionChanged()); + assertTrue(c1.checkAndUpdateReadPositionChanged()); + + ledger.addEntry("dummy-entry-1".getBytes(Encoding)); + ledger.addEntry("dummy-entry-1".getBytes(Encoding)); + ledger.addEntry("dummy-entry-1".getBytes(Encoding)); + ledger.addEntry("dummy-entry-1".getBytes(Encoding)); + + // read-position has not been moved + assertFalse(c1.checkAndUpdateReadPositionChanged()); + + List<Entry> entries = c1.readEntries(2); + entries.forEach(e -> { + try { + c1.markDelete(e.getPosition()); + e.release(); + } catch (Exception e1) { + // Ok + } + }); + + // read-position is moved + assertTrue(c1.checkAndUpdateReadPositionChanged()); + // read-position has not been moved since last read + assertFalse(c1.checkAndUpdateReadPositionChanged()); + + c1.close(); + ledger.close(); + + ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()); + // recover cursor + ManagedCursor c2 = ledger.openCursor("c1"); + assertTrue(c2.checkAndUpdateReadPositionChanged()); + assertFalse(c2.checkAndUpdateReadPositionChanged()); + + entries = c2.readEntries(2); + entries.forEach(e -> { + try { + c2.markDelete(e.getPosition()); + e.release(); + } catch (Exception e1) { + // Ok + } + }); + + assertTrue(c2.checkAndUpdateReadPositionChanged()); + // returns true because read-position is on tail + assertTrue(c2.checkAndUpdateReadPositionChanged()); + assertTrue(c2.checkAndUpdateReadPositionChanged()); + + ledger.close(); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 5979e63..2614706 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -509,6 +509,13 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_POLICIES, dynamic = true, + doc = "Broker periodically checks if subscription is stuck and unblock if flag is enabled. " + + "(Default is disabled)" + ) + private boolean unblockStuckSubscriptionEnabled = false; + @FieldContext( + category = CATEGORY_POLICIES, + dynamic = true, doc = "Tick time to schedule task that checks topic publish rate limiting across all topics " + "Reducing to lower value can give more accuracy while throttling publish but " + "it uses more CPU to perform frequent check. (Disable publish throttling with value 0)" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index 43b02d1..e30d54c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -125,4 +125,11 @@ public interface Dispatcher { // No-op } + /** + * Checks if dispatcher is stuck and unblocks the dispatch if needed. + */ + default boolean checkAndUnblockIfStuck() { + return false; + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 934873a..f3c6d94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -865,6 +865,21 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul this.messagesToRedeliver.add(ledgerId, entryId); } + @Override + public boolean checkAndUnblockIfStuck() { + if (cursor.checkAndUpdateReadPositionChanged()) { + return false; + } + // consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read + if (totalAvailablePermits > 0 && !havePendingReplayRead && !havePendingRead + && cursor.getNumberOfEntriesInBacklog(false) > 0) { + log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name); + readMoreEntries(); + return true; + } + return false; + } + public PersistentTopic getTopic() { return topic; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 48c82d1..468f413 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -615,5 +615,20 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher this.messagesToRedeliver.add(ledgerId, entryId); } + public boolean checkAndUnblockIfStuck() { + if (cursor.checkAndUpdateReadPositionChanged()) { + return false; + } + Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this); + int totalAvailablePermits = consumer.getAvailablePermits(); + // consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read + if (totalAvailablePermits > 0 && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) { + log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name); + readMoreEntries(consumer); + return true; + } + return false; + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index b01e5cb..57281cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1089,5 +1089,9 @@ public class PersistentSubscription implements Subscription { return this.pendingAckHandle.checkIsCanDeleteConsumerPendingAck(position); } + public boolean checkAndUnblockIfStuck() { + return dispatcher.checkAndUnblockIfStuck(); + } + private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d87707a..3ba512a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1534,6 +1534,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal topicStatsHelper.aggMsgRateOut += subMsgRateOut; topicStatsHelper.aggMsgThroughputOut += subMsgThroughputOut; nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog(false); + // check stuck subscription + if (brokerService.getPulsar().getConfig().isUnblockStuckSubscriptionEnabled()) { + subscription.checkAndUnblockIfStuck(); + } } catch (Exception e) { log.error("Got exception when creating consumer stats for subscription {}: {}", subscriptionName, e.getMessage(), e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index f4d11b8..547baef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -21,14 +21,32 @@ package org.apache.pulsar.broker.service.persistent; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.collect.Sets; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.common.policies.data.Policies; import org.awaitility.Awaitility; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.TopicName; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -72,4 +90,67 @@ public class PersistentTopicTest extends BrokerTestBase { persistentTopic.onPoliciesUpdate(policies); verify(persistentTopic, times(1)).checkReplicationAndRetryOnFailure(); } -} \ No newline at end of file + + /** + * Test validates if topic's dispatcher is stuck then broker can doscover and unblock it. + * + * @throws Exception + */ + @Test + public void testUnblockStuckSubscription() throws Exception { + final String topicName = "persistent://prop/ns-abc/stuckSubscriptionTopic"; + final String sharedSubName = "shared"; + final String failoverSubName = "failOver"; + + Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionType(SubscriptionType.Shared).subscriptionName(sharedSubName).subscribe(); + Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionType(SubscriptionType.Failover).subscriptionName(failoverSubName).subscribe(); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + PersistentSubscription sharedSub = topic.getSubscription(sharedSubName); + PersistentSubscription failOverSub = topic.getSubscription(failoverSubName); + + PersistentDispatcherMultipleConsumers sharedDispatcher = (PersistentDispatcherMultipleConsumers) sharedSub + .getDispatcher(); + PersistentDispatcherSingleActiveConsumer failOverDispatcher = (PersistentDispatcherSingleActiveConsumer) failOverSub + .getDispatcher(); + + // build backlog + consumer1.close(); + consumer2.close(); + + // block sub to read messages + sharedDispatcher.havePendingRead = true; + failOverDispatcher.havePendingRead = true; + + producer.newMessage().value("test").eventTime(5).send(); + producer.newMessage().value("test").eventTime(5).send(); + + consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionType(SubscriptionType.Shared) + .subscriptionName(sharedSubName).subscribe(); + consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionType(SubscriptionType.Failover) + .subscriptionName(failoverSubName).subscribe(); + Message<String> msg = consumer1.receive(2, TimeUnit.SECONDS); + assertNull(msg); + msg = consumer2.receive(2, TimeUnit.SECONDS); + assertNull(msg); + + // allow reads but dispatchers are still blocked + sharedDispatcher.havePendingRead = false; + failOverDispatcher.havePendingRead = false; + + // run task to unblock stuck dispatcher: first iteration sets the lastReadPosition and next iteration will + // unblock the dispatcher read because read-position has not been moved since last iteration. + sharedSub.checkAndUnblockIfStuck(); + failOverDispatcher.checkAndUnblockIfStuck(); + assertTrue(sharedSub.checkAndUnblockIfStuck()); + assertTrue(failOverDispatcher.checkAndUnblockIfStuck()); + + msg = consumer1.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + msg = consumer2.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + } +} diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index 28cf79e..eb4076c 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -480,6 +480,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used |maxUnackedMessagesPerSubscription| The same as above, except per subscription rather than per consumer. |200000| | maxUnackedMessagesPerBroker | Maximum number of unacknowledged messages allowed per broker. Once this limit reaches, the broker stops dispatching messages to all shared subscriptions which has a higher number of unacknowledged messages until subscriptions start acknowledging messages back and unacknowledged messages count reaches to limit/2. When the value is set to 0, unacknowledged message limit check is disabled and broker does not block dispatchers. | 0 | | maxUnackedMessagesPerSubscriptionOnBrokerBlocked | Once the broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which have higher unacknowledged messages than this percentage limit and subscription does not receive any new messages until that subscription acknowledges messages back. | 0.16 | +| unblockStuckSubscriptionEnabled|Broker periodically checks if subscription is stuck and unblock if flag is enabled.|false| |maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned topic. Use 0 or negative number to disable the check|0| |zookeeperSessionExpiredPolicy|There are two policies when ZooKeeper session expired happens, "shutdown" and "reconnect". If it is set to "shutdown" policy, when ZooKeeper session expired happens, the broker is shutdown. If it is set to "reconnect" policy, the broker tries to reconnect to ZooKeeper server and re-register metadata to ZooKeeper. Note: the "reconnect" policy is an experiment feature.|shutdown| | topicPublisherThrottlingTickTimeMillis | Tick time to schedule task that checks topic publish rate limiting across all topics. A lower value can improve accuracy while throttling publish but it uses more CPU to perform frequent check. (Disable publish throttling with value 0) | 10|