This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 57a616eaa79 [fix][broker] Fix consumer stops receiving messages when 
with large backlogs processing (#22454)
57a616eaa79 is described below

commit 57a616eaa79096af5b49db89c99cd39ccc94ec00
Author: Jiwei Guo <techno...@apache.org>
AuthorDate: Mon Apr 8 18:22:05 2024 +0800

    [fix][broker] Fix consumer stops receiving messages when with large 
backlogs processing (#22454)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  5 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +---
 .../service/persistent/PersistentSubscription.java |  4 +-
 .../service/persistent/PersistentTopicTest.java    | 56 +++++++++++++++++++++-
 4 files changed, 65 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 4daa06cad57..69b130a98c8 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -990,6 +990,11 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.debug("[{}] [{}] Re-trying the read at position {}", 
ledger.getName(), name, op.readPosition);
             }
 
+            if (isClosed()) {
+                callback.readEntriesFailed(new 
CursorAlreadyClosedException("Cursor was already closed"), ctx);
+                return;
+            }
+
             if (!hasMoreEntries()) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Still no entries available. Register 
for notification", ledger.getName(),
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3a12cb2ad6c..698563ed7a1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1032,6 +1032,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     + consumerName), ctx);
             return;
         } else if (!cursor.isDurable()) {
+            cursor.setState(ManagedCursorImpl.State.Closed);
             cursors.removeCursor(consumerName);
             deactivateCursorByName(consumerName);
             callback.deleteCursorComplete(ctx);
@@ -3814,13 +3815,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
 
     public void addWaitingCursor(ManagedCursorImpl cursor) {
-        if (cursor instanceof NonDurableCursorImpl) {
-            if (cursor.isActive()) {
-                this.waitingCursors.add(cursor);
-            }
-        } else {
-            this.waitingCursors.add(cursor);
-        }
+        this.waitingCursors.add(cursor);
     }
 
     public boolean isCursorActive(ManagedCursor cursor) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 6e8e94baeae..dbbf92aa76d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -308,7 +308,6 @@ public class PersistentSubscription extends 
AbstractSubscription {
 
         if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
             deactivateCursor();
-            topic.getManagedLedger().removeWaitingCursor(cursor);
 
             if (!cursor.isDurable()) {
                 // If cursor is not durable, we need to clean up the 
subscription as well. No need to check for active
@@ -338,11 +337,14 @@ public class PersistentSubscription extends 
AbstractSubscription {
                     if (!isResetCursor) {
                         try {
                             
topic.getManagedLedger().deleteCursor(cursor.getName());
+                            
topic.getManagedLedger().removeWaitingCursor(cursor);
                         } catch (InterruptedException | ManagedLedgerException 
e) {
                             log.warn("[{}] [{}] Failed to remove non durable 
cursor", topic.getName(), subName, e);
                         }
                     }
                 });
+            } else {
+                topic.getManagedLedger().removeWaitingCursor(cursor);
             }
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index d42b1d92007..c214634e6ed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -80,6 +80,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -113,6 +114,11 @@ public class PersistentTopicTest extends BrokerTestBase {
         super.internalCleanup();
     }
 
+    @Override protected void doInitConf() throws Exception {
+        super.doInitConf();
+        this.conf.setManagedLedgerCursorBackloggedThreshold(10);
+    }
+
     /**
      * Test validates that broker cleans up topic which failed to unload while 
bundle unloading.
      *
@@ -681,7 +687,7 @@ public class PersistentTopicTest extends BrokerTestBase {
         ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)persistentTopic.getManagedLedger();
         final ManagedCursor spyCursor= 
spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2"));
         doAnswer((invocation) -> {
-            Thread.sleep(10_000);
+            Thread.sleep(5_000);
             invocation.callRealMethod();
             return null;
         }).when(spyCursor).asyncReadEntriesOrWait(any(int.class), 
any(long.class),
@@ -708,4 +714,52 @@ public class PersistentTopicTest extends BrokerTestBase {
                     assertEquals(ledger.getWaitingCursorsCount(), 0);
         });
     }
+
+    @Test
+    public void testAddWaitingCursorsForNonDurable2() throws Exception {
+        final String ns = "prop/ns-test";
+        admin.namespaces().createNamespace(ns, 2);
+        final String topicName = 
"persistent://prop/ns-test/testAddWaitingCursors2";
+        admin.topics().createNonPartitionedTopic(topicName);
+        pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionMode(SubscriptionMode.Durable)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub-1").subscribe().close();
+        @Cleanup
+        final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topicName).create();
+        for (int i = 0; i < 100; i ++) {
+            producer.sendAsync("test-" + i);
+        }
+        @Cleanup
+        final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionMode(SubscriptionMode.NonDurable)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName("sub-2").subscribe();
+        int count = 0;
+        while(true) {
+            final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.acknowledge(msg);
+                count++;
+            } else {
+                break;
+            }
+        }
+        Assert.assertEquals(count, 100);
+        Thread.sleep(3_000);
+        for (int i = 0; i < 100; i ++) {
+            producer.sendAsync("test-" + i);
+        }
+        while(true) {
+            final Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.acknowledge(msg);
+                count++;
+            } else {
+                break;
+            }
+        }
+        Assert.assertEquals(count, 200);
+    }
 }

Reply via email to