This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3a5ffb82eb7be4197db08d25de143b2dc9486269 Author: Cong Zhao <zhaoc...@apache.org> AuthorDate: Fri Apr 21 16:06:22 2023 +0800 [improve][broker] Move bitmap from lastMutableBucket to ImmutableBucket (#20156) (cherry picked from commit e5a833a2dcb7ce13ada4ca94714cc045a02de276) --- .../bucket/BucketDelayedDeliveryTracker.java | 3 +- .../broker/delayed/bucket/MutableBucket.java | 41 +++++++++++++++------- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index c90064c9137..67a7de1f013 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -168,7 +168,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker } try { - FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 2, TimeUnit.SECONDS); + FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 5, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.error("[{}] Failed to recover delayed message index bucket snapshot.", dispatcher.getName(), e); if (e instanceof InterruptedException) { @@ -343,6 +343,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker // If (ledgerId < startLedgerId || existBucket) means that message index belong to previous bucket range, // enter sharedBucketPriorityQueue directly sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId); + lastMutableBucket.putIndexBit(ledgerId, entryId); } else { checkArgument(ledgerId >= lastMutableBucket.endLedgerId); lastMutableBucket.addMessage(ledgerId, entryId, deliverAt); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java index b7e9e68f1bd..1173a401a89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.delayed.bucket; import static com.google.common.base.Preconditions.checkArgument; -import com.google.protobuf.ByteString; +import com.google.protobuf.UnsafeByteOperations; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -74,6 +74,8 @@ class MutableBucket extends Bucket implements AutoCloseable { List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>(); List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>(); + Map<Long, RoaringBitmap> immutableBucketBitMap = new HashMap<>(); + Map<Long, RoaringBitmap> bitMap = new HashMap<>(); SnapshotSegment snapshotSegment = new SnapshotSegment(); SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder(); @@ -82,18 +84,20 @@ class MutableBucket extends Bucket implements AutoCloseable { long currentTimestampUpperLimit = 0; long currentFirstTimestamp = 0L; while (!delayedIndexQueue.isEmpty()) { - DelayedIndex delayedIndex = snapshotSegment.addIndexe(); - delayedIndexQueue.popToObject(delayedIndex); - - long timestamp = delayedIndex.getTimestamp(); + final long timestamp = delayedIndexQueue.peekTimestamp(); if (currentTimestampUpperLimit == 0) { currentFirstTimestamp = timestamp; firstScheduleTimestamps.add(currentFirstTimestamp); currentTimestampUpperLimit = timestamp + timeStepPerBucketSnapshotSegment - 1; } - long ledgerId = delayedIndex.getLedgerId(); - long entryId = delayedIndex.getEntryId(); + DelayedIndex delayedIndex = snapshotSegment.addIndexe(); + delayedIndexQueue.popToObject(delayedIndex); + + final long ledgerId = delayedIndex.getLedgerId(); + final long entryId = delayedIndex.getEntryId(); + + removeIndexBit(ledgerId, entryId); checkArgument(ledgerId >= startLedgerId && ledgerId <= endLedgerId); @@ -102,10 +106,10 @@ class MutableBucket extends Bucket implements AutoCloseable { sharedQueue.add(timestamp, ledgerId, entryId); } - numMessages++; - bitMap.computeIfAbsent(ledgerId, k -> new RoaringBitmap()).add(entryId, entryId + 1); + numMessages++; + if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peekTimestamp() > currentTimestampUpperLimit || (maxIndexesPerBucketSnapshotSegment != -1 && snapshotSegment.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) { @@ -119,9 +123,17 @@ class MutableBucket extends Bucket implements AutoCloseable { final var lId = entry.getKey(); final var bm = entry.getValue(); bm.runOptimize(); - final var array = new byte[bm.serializedSizeInBytes()]; - bm.serialize(ByteBuffer.wrap(array)); - segmentMetadataBuilder.putDelayedIndexBitMap(lId, ByteString.copyFrom(array)); + ByteBuffer byteBuffer = ByteBuffer.allocate(bm.serializedSizeInBytes()); + bm.serialize(byteBuffer); + byteBuffer.flip(); + segmentMetadataBuilder.putDelayedIndexBitMap(lId, UnsafeByteOperations.unsafeWrap(byteBuffer)); + immutableBucketBitMap.compute(lId, (__, bm0) -> { + if (bm0 == null) { + return bm; + } + bm0.or(bm); + return bm0; + }); iterator.remove(); } @@ -133,6 +145,10 @@ class MutableBucket extends Bucket implements AutoCloseable { } } + // optimize bm + immutableBucketBitMap.values().forEach(RoaringBitmap::runOptimize); + this.delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize); + SnapshotMetadata bucketSnapshotMetadata = SnapshotMetadata.newBuilder() .addAllMetadataList(segmentMetadataList) .build(); @@ -145,6 +161,7 @@ class MutableBucket extends Bucket implements AutoCloseable { bucket.setNumberBucketDelayedMessages(numMessages); bucket.setLastSegmentEntryId(lastSegmentEntryId); bucket.setFirstScheduleTimestamps(firstScheduleTimestamps); + bucket.setDelayedIndexBitMap(immutableBucketBitMap); // Skip first segment, because it has already been loaded List<SnapshotSegment> snapshotSegments = bucketSnapshotSegments.subList(1, bucketSnapshotSegments.size());