This is an automated email from the ASF dual-hosted git repository. cbornet pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f3572574b132dfb6d5a67011f7fb68096e69e1c7 Author: Christophe Bornet <cbor...@hotmail.com> AuthorDate: Mon Apr 24 11:48:03 2023 +0200 Revert "[improve][broker] Optimize delayed metadata index bitmap (#20136)" This reverts commit ff59240165c73a9c3a3dcca20702ab44b0b18d33. --- .../bucket/BucketDelayedDeliveryTracker.java | 3 -- .../broker/delayed/bucket/ImmutableBucket.java | 53 +++++++++------------- .../broker/delayed/bucket/MutableBucket.java | 11 ++--- 3 files changed, 25 insertions(+), 42 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index b17387e276e..b4d1745e22f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -485,9 +485,6 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker }); }); } - - // optimize bm - delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize); immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap); afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index 57de5c84fcd..82e98cefa5d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -20,10 +20,11 @@ package org.apache.pulsar.broker.delayed.bucket; import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry; import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.NULL_LONG_PROMISE; -import java.io.IOException; +import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -36,8 +37,8 @@ import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotF import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata; import org.apache.pulsar.common.util.FutureUtil; -import org.roaringbitmap.InvalidRoaringFormat; import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; @Slf4j class ImmutableBucket extends Bucket { @@ -97,7 +98,7 @@ class ImmutableBucket extends Bucket { this.setLastSegmentEntryId(metadataList.size()); this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList); List<Long> firstScheduleTimestamps = metadataList.stream().map( - SnapshotSegmentMetadata::getMinScheduleTimestamp).toList(); + SnapshotSegmentMetadata::getMinScheduleTimestamp).toList(); this.setFirstScheduleTimestamps(firstScheduleTimestamps); return nextSnapshotEntryIndex + 1; @@ -138,37 +139,25 @@ class ImmutableBucket extends Bucket { }); } - /** - * Recover delayed index bit map and message numbers. - * @throws InvalidRoaringFormat invalid bitmap serialization format - */ private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex, - List<SnapshotSegmentMetadata> segmentMetaList) { - delayedIndexBitMap.clear(); // cleanup dirty bm - final var numberMessages = new MutableLong(0); - for (int i = startSnapshotIndex; i < segmentMetaList.size(); i++) { - for (final var entry : segmentMetaList.get(i).getDelayedIndexBitMapMap().entrySet()) { - final var ledgerId = entry.getKey(); - final var bs = entry.getValue(); - final var sbm = new RoaringBitmap(); - try { - sbm.deserialize(bs.asReadOnlyByteBuffer()); - } catch (IOException e) { - throw new InvalidRoaringFormat(e.getMessage()); + List<SnapshotSegmentMetadata> segmentMetadata) { + this.delayedIndexBitMap.clear(); + MutableLong numberMessages = new MutableLong(0); + for (int i = startSnapshotIndex; i < segmentMetadata.size(); i++) { + Map<Long, ByteString> bitByteStringMap = segmentMetadata.get(i).getDelayedIndexBitMapMap(); + bitByteStringMap.forEach((leaderId, bitSetString) -> { + boolean exist = this.delayedIndexBitMap.containsKey(leaderId); + RoaringBitmap bitSet = + new ImmutableRoaringBitmap(bitSetString.asReadOnlyByteBuffer()).toRoaringBitmap(); + numberMessages.add(bitSet.getCardinality()); + if (!exist) { + this.delayedIndexBitMap.put(leaderId, bitSet); + } else { + this.delayedIndexBitMap.get(leaderId).or(bitSet); } - numberMessages.add(sbm.getCardinality()); - delayedIndexBitMap.compute(ledgerId, (lId, bm) -> { - if (bm == null) { - return sbm; - } - bm.or(sbm); - return bm; - }); - } + }); } - // optimize bm - delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize); - setNumberBucketDelayedMessages(numberMessages.getValue()); + this.setNumberBucketDelayedMessages(numberMessages.getValue()); } CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getRemainSnapshotSegment() { @@ -204,7 +193,7 @@ class ImmutableBucket extends Bucket { stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.delete); } else { log.info("[{}] Delete bucket snapshot finish, bucketId: {}, bucketKey: {}", - dispatcherName, bucketId, bucketKey); + dispatcherName, bucketId, bucketKey); stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete, System.currentTimeMillis() - deleteStartTime); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java index f404d5d02c1..e49ebe9606e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java @@ -116,13 +116,10 @@ class MutableBucket extends Bucket implements AutoCloseable { Iterator<Map.Entry<Long, RoaringBitmap>> iterator = bitMap.entrySet().iterator(); while (iterator.hasNext()) { - final var entry = iterator.next(); - final var lId = entry.getKey(); - final var bm = entry.getValue(); - bm.runOptimize(); - final var array = new byte[bm.serializedSizeInBytes()]; - bm.serialize(ByteBuffer.wrap(array)); - segmentMetadataBuilder.putDelayedIndexBitMap(lId, ByteString.copyFrom(array)); + Map.Entry<Long, RoaringBitmap> entry = iterator.next(); + byte[] array = new byte[entry.getValue().serializedSizeInBytes()]; + entry.getValue().serialize(ByteBuffer.wrap(array)); + segmentMetadataBuilder.putDelayedIndexBitMap(entry.getKey(), ByteString.copyFrom(array)); iterator.remove(); }