This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6152cc82ac8 [fix][broker] Fix avoid future of clear delayed message can't complete (#20075) 6152cc82ac8 is described below commit 6152cc82ac8a344cfc2539c56f15cec6f4a1cff7 Author: Cong Zhao <zhaoc...@apache.org> AuthorDate: Sat Apr 15 13:42:07 2023 +0800 [fix][broker] Fix avoid future of clear delayed message can't complete (#20075) --- .../BucketDelayedDeliveryTrackerFactory.java | 28 +++++++++++++ .../pulsar/broker/delayed/bucket/Bucket.java | 3 +- .../bucket/BucketDelayedDeliveryTracker.java | 4 +- .../PersistentDispatcherMultipleConsumers.java | 17 ++++---- .../broker/service/persistent/PersistentTopic.java | 26 +++++++----- .../persistent/BucketDelayedDeliveryTest.java | 48 ++++++++++++++++++++++ 6 files changed, 102 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java index f0feb8b27d6..6a00bfd1995 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java @@ -21,13 +21,19 @@ package org.apache.pulsar.broker.delayed; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage; import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.apache.pulsar.common.util.FutureUtil; public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory { @@ -72,6 +78,28 @@ public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrack delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets); } + /** + * Clean up residual snapshot data. + * If tracker has not been created or has been closed, then we can't clean up the snapshot with `tracker.clear`, + * this method can clean up the residual snapshots without creating a tracker. + */ + public CompletableFuture<Void> cleanResidualSnapshots(ManagedCursor cursor) { + Map<String, String> cursorProperties = cursor.getCursorProperties(); + List<CompletableFuture<Void>> futures = new ArrayList<>(); + FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create(); + cursorProperties.forEach((k, v) -> { + if (k != null && v != null && k.startsWith(BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX)) { + CompletableFuture<Void> future = sequencer.sequential(() -> { + return cursor.removeCursorProperty(k) + .thenCompose(__ -> bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v))); + }); + futures.add(future); + } + }); + + return FutureUtil.waitForAll(futures); + } + @Override public void close() throws Exception { if (bucketSnapshotStorage != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java index 5b7023be503..4d7d3aa512b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java @@ -18,8 +18,8 @@ */ package org.apache.pulsar.broker.delayed.bucket; -import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX; import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry; +import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,7 +41,6 @@ import org.roaringbitmap.RoaringBitmap; @AllArgsConstructor abstract class Bucket { - static final String DELAYED_BUCKET_KEY_PREFIX = CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket"; static final String DELIMITER = "_"; static final int MaxRetryTimes = 3; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index a34bd51af98..6ead1e207b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.delayed.bucket; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX; +import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX; import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER; import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes; import com.google.common.annotations.VisibleForTesting; @@ -66,6 +66,8 @@ import org.roaringbitmap.RoaringBitmap; @ThreadSafe public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { + public static final String DELAYED_BUCKET_KEY_PREFIX = CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket"; + static final CompletableFuture<Long> NULL_LONG_PROMISE = CompletableFuture.completedFuture(null); static final int AsyncOperationTimeoutSeconds = 60; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 4ac755860fc..c60b4562bf1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -49,6 +49,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; +import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; @@ -1098,19 +1099,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul return CompletableFuture.completedFuture(null); } - if (delayedDeliveryTracker.isEmpty() && topic.getBrokerService() - .getDelayedDeliveryTrackerFactory() instanceof BucketDelayedDeliveryTrackerFactory) { - synchronized (this) { - if (delayedDeliveryTracker.isEmpty()) { - delayedDeliveryTracker = Optional - .of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this)); - } - } - } - if (delayedDeliveryTracker.isPresent()) { return this.delayedDeliveryTracker.get().clear(); } else { + DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory = + topic.getBrokerService().getDelayedDeliveryTrackerFactory(); + if (delayedDeliveryTrackerFactory instanceof BucketDelayedDeliveryTrackerFactory + bucketDelayedDeliveryTrackerFactory) { + return bucketDelayedDeliveryTrackerFactory.cleanResidualSnapshots(cursor); + } return CompletableFuture.completedFuture(null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0f5e6043981..fe181bb1c01 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -81,6 +81,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; +import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -1169,15 +1170,21 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } Dispatcher dispatcher = persistentSubscription.getDispatcher(); - final Dispatcher temporaryDispatcher; if (dispatcher == null) { - log.info("[{}][{}] Dispatcher is null, try to create temporary dispatcher to clear delayed message", topic, - subscriptionName); - dispatcher = temporaryDispatcher = - new PersistentDispatcherMultipleConsumers(this, persistentSubscription.cursor, - persistentSubscription); - } else { - temporaryDispatcher = null; + DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory = + brokerService.getDelayedDeliveryTrackerFactory(); + if (delayedDeliveryTrackerFactory instanceof BucketDelayedDeliveryTrackerFactory + bucketDelayedDeliveryTrackerFactory) { + ManagedCursor cursor = persistentSubscription.getCursor(); + bucketDelayedDeliveryTrackerFactory.cleanResidualSnapshots(cursor).whenComplete((__, ex) -> { + if (ex != null) { + unsubscribeFuture.completeExceptionally(ex); + } else { + asyncDeleteCursor(subscriptionName, unsubscribeFuture); + } + }); + } + return; } dispatcher.clearDelayedMessages().whenComplete((__, ex) -> { @@ -1186,9 +1193,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { asyncDeleteCursor(subscriptionName, unsubscribeFuture); } - if (temporaryDispatcher != null) { - temporaryDispatcher.close(); - } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java index 5480a2e7a70..0a82b2b4c3c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java @@ -305,4 +305,52 @@ public class BucketDelayedDeliveryTest extends DelayedDeliveryTest { assertTrue(namespaceMetric.isPresent()); assertEquals(6, namespaceMetric.get().value); } + + @Test + public void testDelete() throws Exception { + String topic = BrokerTestUtil.newUniqueName("persistent://public/default/testDelete"); + + @Cleanup + Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + for (int i = 0; i < 1000; i++) { + producer.newMessage() + .value("msg") + .deliverAfter(1, TimeUnit.HOURS) + .send(); + } + + Dispatcher dispatcher = pulsar.getBrokerService().getTopicReference(topic).get().getSubscription("sub").getDispatcher(); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 1000)); + + Map<String, String> cursorProperties = + ((PersistentDispatcherMultipleConsumers) dispatcher).getCursor().getCursorProperties(); + List<Long> bucketIds = cursorProperties.entrySet().stream() + .filter(x -> x.getKey().startsWith(CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket")).map( + x -> Long.valueOf(x.getValue())).toList(); + + assertTrue(bucketIds.size() > 0); + + admin.topics().delete(topic, true); + + for (Long bucketId : bucketIds) { + try { + LedgerHandle ledgerHandle = + pulsarTestContext.getBookKeeperClient() + .openLedger(bucketId, BookKeeper.DigestType.CRC32C, new byte[]{}); + Assert.fail("Should fail"); + } catch (BKException.BKNoSuchLedgerExistsException e) { + // ignore it + } + } + } }