This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 57a616eaa79 [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454) 57a616eaa79 is described below commit 57a616eaa79096af5b49db89c99cd39ccc94ec00 Author: Jiwei Guo <techno...@apache.org> AuthorDate: Mon Apr 8 18:22:05 2024 +0800 [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 ++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +--- .../service/persistent/PersistentSubscription.java | 4 +- .../service/persistent/PersistentTopicTest.java | 56 +++++++++++++++++++++- 4 files changed, 65 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 4daa06cad57..69b130a98c8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -990,6 +990,11 @@ public class ManagedCursorImpl implements ManagedCursor { log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition); } + if (isClosed()) { + callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx); + return; + } + if (!hasMoreEntries()) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Still no entries available. Register for notification", ledger.getName(), diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 3a12cb2ad6c..698563ed7a1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1032,6 +1032,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { + consumerName), ctx); return; } else if (!cursor.isDurable()) { + cursor.setState(ManagedCursorImpl.State.Closed); cursors.removeCursor(consumerName); deactivateCursorByName(consumerName); callback.deleteCursorComplete(ctx); @@ -3814,13 +3815,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } public void addWaitingCursor(ManagedCursorImpl cursor) { - if (cursor instanceof NonDurableCursorImpl) { - if (cursor.isActive()) { - this.waitingCursors.add(cursor); - } - } else { - this.waitingCursors.add(cursor); - } + this.waitingCursors.add(cursor); } public boolean isCursorActive(ManagedCursor cursor) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 6e8e94baeae..dbbf92aa76d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -308,7 +308,6 @@ public class PersistentSubscription extends AbstractSubscription { if (dispatcher != null && dispatcher.getConsumers().isEmpty()) { deactivateCursor(); - topic.getManagedLedger().removeWaitingCursor(cursor); if (!cursor.isDurable()) { // If cursor is not durable, we need to clean up the subscription as well. No need to check for active @@ -338,11 +337,14 @@ public class PersistentSubscription extends AbstractSubscription { if (!isResetCursor) { try { topic.getManagedLedger().deleteCursor(cursor.getName()); + topic.getManagedLedger().removeWaitingCursor(cursor); } catch (InterruptedException | ManagedLedgerException e) { log.warn("[{}] [{}] Failed to remove non durable cursor", topic.getName(), subName, e); } } }); + } else { + topic.getManagedLedger().removeWaitingCursor(cursor); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index d42b1d92007..c214634e6ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -80,6 +80,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -113,6 +114,11 @@ public class PersistentTopicTest extends BrokerTestBase { super.internalCleanup(); } + @Override protected void doInitConf() throws Exception { + super.doInitConf(); + this.conf.setManagedLedgerCursorBackloggedThreshold(10); + } + /** * Test validates that broker cleans up topic which failed to unload while bundle unloading. * @@ -681,7 +687,7 @@ public class PersistentTopicTest extends BrokerTestBase { ManagedLedgerImpl ledger = (ManagedLedgerImpl)persistentTopic.getManagedLedger(); final ManagedCursor spyCursor= spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2")); doAnswer((invocation) -> { - Thread.sleep(10_000); + Thread.sleep(5_000); invocation.callRealMethod(); return null; }).when(spyCursor).asyncReadEntriesOrWait(any(int.class), any(long.class), @@ -708,4 +714,52 @@ public class PersistentTopicTest extends BrokerTestBase { assertEquals(ledger.getWaitingCursorsCount(), 0); }); } + + @Test + public void testAddWaitingCursorsForNonDurable2() throws Exception { + final String ns = "prop/ns-test"; + admin.namespaces().createNamespace(ns, 2); + final String topicName = "persistent://prop/ns-test/testAddWaitingCursors2"; + admin.topics().createNonPartitionedTopic(topicName); + pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionMode(SubscriptionMode.Durable) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName("sub-1").subscribe().close(); + @Cleanup + final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topicName).create(); + for (int i = 0; i < 100; i ++) { + producer.sendAsync("test-" + i); + } + @Cleanup + final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionMode(SubscriptionMode.NonDurable) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("sub-2").subscribe(); + int count = 0; + while(true) { + final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS); + if (msg != null) { + consumer.acknowledge(msg); + count++; + } else { + break; + } + } + Assert.assertEquals(count, 100); + Thread.sleep(3_000); + for (int i = 0; i < 100; i ++) { + producer.sendAsync("test-" + i); + } + while(true) { + final Message<String> msg = consumer.receive(5, TimeUnit.SECONDS); + if (msg != null) { + consumer.acknowledge(msg); + count++; + } else { + break; + } + } + Assert.assertEquals(count, 200); + } }