This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new c934df79234 [fix] [broker] Fix infinite ack of Replicator after topic 
is closed (#20232)
c934df79234 is described below

commit c934df792343fa169178f85331a0d15cae2c149e
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Sun May 7 14:21:51 2023 +0800

    [fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)
    
    (cherry picked from commit 98413642995eb6e562f6a591dcf56e20ac0cc7ef)
---
 .../service/persistent/PersistentReplicator.java   |  8 ++++
 .../pulsar/broker/service/ReplicatorTest.java      | 46 +++++++++++++++-------
 2 files changed, 39 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 76a032c0217..ceddfc47749 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -669,6 +669,14 @@ public class PersistentReplicator extends 
AbstractReplicator
     public void deleteFailed(ManagedLedgerException exception, Object ctx) {
         log.error("[{}][{} -> {}] Failed to delete message at {}: {}", 
topicName, localCluster, remoteCluster, ctx,
                 exception.getMessage(), exception);
+        if (exception instanceof CursorAlreadyClosedException) {
+            log.error("[{}][{} -> {}] Asynchronous ack failure because 
replicator is already deleted and cursor is"
+                    + " already closed {}, ({})", topic, localCluster, 
remoteCluster, ctx, exception.getMessage(),
+                    exception);
+            // replicator is already deleted and cursor is already closed so, 
producer should also be stopped
+            closeProducerAsync();
+            return;
+        }
         if (ctx instanceof PositionImpl) {
             PositionImpl deletedEntry = (PositionImpl) ctx;
             if (deletedEntry.compareTo((PositionImpl) 
cursor.getMarkDeletedPosition()) > 0) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 1e76d216986..158d223336b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
@@ -50,9 +51,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
-
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -60,7 +62,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.State;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -1614,20 +1615,41 @@ public class ReplicatorTest extends ReplicatorTestBase {
         log.info("--- Starting ReplicatorTest::testReplication ---");
 
         String namespace = "pulsar/global/ns2";
-        admin1.namespaces().createNamespace(namespace);
-        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+        admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
         final TopicName dest = TopicName
                 .get(BrokerTestUtil.newUniqueName("persistent://" + namespace 
+ "/ackFailedTopic"));
 
         @Cleanup
         MessageProducer producer1 = new MessageProducer(url1, dest);
-        log.info("--- Starting producer --- " + url1);
 
+        PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getTopic(dest.toString(), false)
+                .getNow(null).get();
+        final ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+        final ManagedCursorImpl cursor = (ManagedCursorImpl) 
managedLedger.openCursor("pulsar.repl.r2");
+        final ManagedCursorImpl spyCursor = spy(cursor);
+        managedLedger.getCursors().removeCursor(cursor.getName());
+        managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST);
+        AtomicBoolean isMakeAckFail = new AtomicBoolean(false);
+        doAnswer(invocation -> {
+            Position pos = (Position) invocation.getArguments()[0];
+            AsyncCallbacks.DeleteCallback cb = (AsyncCallbacks.DeleteCallback) 
invocation.getArguments()[1];
+            Object ctx = invocation.getArguments()[2];
+            if (isMakeAckFail.get()) {
+                log.info("async-delete {} will be failed", pos);
+                cb.deleteFailed(new ManagedLedgerException("mocked error"), 
ctx);
+            } else {
+                log.info("async-delete {} will success", pos);
+                cursor.asyncDelete(pos, cb, ctx);
+            }
+            return null;
+        }).when(spyCursor).asyncDelete(Mockito.any(Position.class), 
Mockito.any(AsyncCallbacks.DeleteCallback.class),
+                Mockito.any());
+
+        log.info("--- Starting producer --- " + url1);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
         // Produce from cluster1 and consume from the rest
         producer1.produce(2);
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getTopic(dest.toString(), false)
-                .getNow(null).get();
         MessageIdImpl lastMessageId = (MessageIdImpl) 
topic.getLastMessageId().get();
         Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), 
lastMessageId.getEntryId());
         ConcurrentOpenHashMap<String, Replicator> replicators = 
topic.getReplicators();
@@ -1636,25 +1658,19 @@ public class ReplicatorTest extends ReplicatorTestBase {
         Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, 
TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started,
                         replicator.getState()));
-
         assertEquals(replicator.getState(), 
org.apache.pulsar.broker.service.AbstractReplicator.State.Started);
-        ManagedCursorImpl cursor = (ManagedCursorImpl) replicator.getCursor();
 
         // Make sure all the data has replicated to the remote cluster before 
close the cursor.
         Awaitility.await().untilAsserted(() -> 
assertEquals(cursor.getMarkDeletedPosition(), lastPosition));
 
-        cursor.setState(State.Closed);
-
-        Field field = ManagedCursorImpl.class.getDeclaredField("state");
-        field.setAccessible(true);
-        field.set(cursor, State.Closed);
+        isMakeAckFail.set(true);
 
         producer1.produce(10);
 
         // The cursor is closed, so the mark delete position will not move 
forward.
         assertEquals(cursor.getMarkDeletedPosition(), lastPosition);
 
-        field.set(cursor, State.Open);
+        isMakeAckFail.set(false);
 
         Awaitility.await().timeout(30, TimeUnit.SECONDS).until(
                 () -> {

Reply via email to