This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f3572574b132dfb6d5a67011f7fb68096e69e1c7
Author: Christophe Bornet <cbor...@hotmail.com>
AuthorDate: Mon Apr 24 11:48:03 2023 +0200

    Revert "[improve][broker] Optimize delayed metadata index bitmap (#20136)"
    
    This reverts commit ff59240165c73a9c3a3dcca20702ab44b0b18d33.
---
 .../bucket/BucketDelayedDeliveryTracker.java       |  3 --
 .../broker/delayed/bucket/ImmutableBucket.java     | 53 +++++++++-------------
 .../broker/delayed/bucket/MutableBucket.java       | 11 ++---
 3 files changed, 25 insertions(+), 42 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index b17387e276e..b4d1745e22f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -485,9 +485,6 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                                     });
                                 });
                             }
-
-                            // optimize bm
-                            
delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
                             
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
 
                             
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 57de5c84fcd..82e98cefa5d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -20,10 +20,11 @@ package org.apache.pulsar.broker.delayed.bucket;
 
 import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
 import static 
org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.NULL_LONG_PROMISE;
-import java.io.IOException;
+import com.google.protobuf.ByteString;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
@@ -36,8 +37,8 @@ import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotF
 import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
 import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.roaringbitmap.InvalidRoaringFormat;
 import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
 @Slf4j
 class ImmutableBucket extends Bucket {
@@ -97,7 +98,7 @@ class ImmutableBucket extends Bucket {
                         this.setLastSegmentEntryId(metadataList.size());
                         
this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);
                         List<Long> firstScheduleTimestamps = 
metadataList.stream().map(
-                                
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
+                                        
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
                         
this.setFirstScheduleTimestamps(firstScheduleTimestamps);
 
                         return nextSnapshotEntryIndex + 1;
@@ -138,37 +139,25 @@ class ImmutableBucket extends Bucket {
         });
     }
 
-    /**
-     * Recover delayed index bit map and message numbers.
-     * @throws InvalidRoaringFormat invalid bitmap serialization format
-     */
     private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex,
-                                                    
List<SnapshotSegmentMetadata> segmentMetaList) {
-        delayedIndexBitMap.clear(); // cleanup dirty bm
-        final var numberMessages = new MutableLong(0);
-        for (int i = startSnapshotIndex; i < segmentMetaList.size(); i++) {
-            for (final var entry : 
segmentMetaList.get(i).getDelayedIndexBitMapMap().entrySet()) {
-                final var ledgerId = entry.getKey();
-                final var bs = entry.getValue();
-                final var sbm = new RoaringBitmap();
-                try {
-                    sbm.deserialize(bs.asReadOnlyByteBuffer());
-                } catch (IOException e) {
-                    throw new InvalidRoaringFormat(e.getMessage());
+                                                    
List<SnapshotSegmentMetadata> segmentMetadata) {
+        this.delayedIndexBitMap.clear();
+        MutableLong numberMessages = new MutableLong(0);
+        for (int i = startSnapshotIndex; i < segmentMetadata.size(); i++) {
+            Map<Long, ByteString> bitByteStringMap = 
segmentMetadata.get(i).getDelayedIndexBitMapMap();
+            bitByteStringMap.forEach((leaderId, bitSetString) -> {
+                boolean exist = this.delayedIndexBitMap.containsKey(leaderId);
+                RoaringBitmap bitSet =
+                        new 
ImmutableRoaringBitmap(bitSetString.asReadOnlyByteBuffer()).toRoaringBitmap();
+                numberMessages.add(bitSet.getCardinality());
+                if (!exist) {
+                    this.delayedIndexBitMap.put(leaderId, bitSet);
+                } else {
+                    this.delayedIndexBitMap.get(leaderId).or(bitSet);
                 }
-                numberMessages.add(sbm.getCardinality());
-                delayedIndexBitMap.compute(ledgerId, (lId, bm) -> {
-                    if (bm == null) {
-                        return sbm;
-                    }
-                    bm.or(sbm);
-                    return bm;
-                });
-            }
+            });
         }
-        // optimize bm
-        delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
-        setNumberBucketDelayedMessages(numberMessages.getValue());
+        this.setNumberBucketDelayedMessages(numberMessages.getValue());
     }
 
     
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
 getRemainSnapshotSegment() {
@@ -204,7 +193,7 @@ class ImmutableBucket extends Bucket {
                         
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.delete);
                     } else {
                         log.info("[{}] Delete bucket snapshot finish, 
bucketId: {}, bucketKey: {}",
-                                dispatcherName, bucketId, bucketKey);
+                                 dispatcherName, bucketId, bucketKey);
 
                         
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete,
                                 System.currentTimeMillis() - deleteStartTime);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index f404d5d02c1..e49ebe9606e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -116,13 +116,10 @@ class MutableBucket extends Bucket implements 
AutoCloseable {
 
                 Iterator<Map.Entry<Long, RoaringBitmap>> iterator = 
bitMap.entrySet().iterator();
                 while (iterator.hasNext()) {
-                    final var entry = iterator.next();
-                    final var lId = entry.getKey();
-                    final var bm = entry.getValue();
-                    bm.runOptimize();
-                    final var array = new byte[bm.serializedSizeInBytes()];
-                    bm.serialize(ByteBuffer.wrap(array));
-                    segmentMetadataBuilder.putDelayedIndexBitMap(lId, 
ByteString.copyFrom(array));
+                    Map.Entry<Long, RoaringBitmap> entry = iterator.next();
+                    byte[] array = new 
byte[entry.getValue().serializedSizeInBytes()];
+                    entry.getValue().serialize(ByteBuffer.wrap(array));
+                    
segmentMetadataBuilder.putDelayedIndexBitMap(entry.getKey(), 
ByteString.copyFrom(array));
                     iterator.remove();
                 }
 

Reply via email to