merlimat closed pull request #1205: Algorithm to find start point of compacted 
ledger
URL: https://github.com/apache/incubator-pulsar/pull/1205
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
index d1b6e14bc..d36520cfb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
@@ -38,7 +38,7 @@
     private final MessageIdData id;
     private final ByteBuf headersAndPayload;
 
-    RawMessageImpl(MessageIdData id, ByteBuf headersAndPayload) {
+    public RawMessageImpl(MessageIdData id, ByteBuf headersAndPayload) {
         this.id = id;
         this.headersAndPayload = headersAndPayload.retainedSlice();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 068ab4972..0afccd10b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -18,10 +18,100 @@
  */
 package org.apache.pulsar.compaction;
 
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.collect.ComparisonChain;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawMessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CompactedTopicImpl implements CompactedTopic {
+    final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
+
     @Override
     public void newCompactedLedger(Position p, long compactedLedgerId) {}
+
+    static CompletableFuture<Long> findStartPoint(PositionImpl p,
+                                                  long lastEntryId,
+                                                  
AsyncLoadingCache<Long,MessageIdData> cache) {
+        CompletableFuture<Long> promise = new CompletableFuture<>();
+        findStartPointLoop(p, 0, lastEntryId, promise, cache);
+        return promise;
+    }
+
+    private static void findStartPointLoop(PositionImpl p, long start, long 
end,
+                                           CompletableFuture<Long> promise,
+                                           
AsyncLoadingCache<Long,MessageIdData> cache) {
+        long midpoint = start + ((end - start) / 2);
+
+        CompletableFuture<MessageIdData> startEntry = cache.get(start);
+        CompletableFuture<MessageIdData> middleEntry = cache.get(midpoint);
+        CompletableFuture<MessageIdData> endEntry = cache.get(end);
+
+        CompletableFuture.allOf(startEntry, middleEntry, endEntry).thenRun(
+                () -> {
+                    if (comparePositionAndMessageId(p, startEntry.join()) < 0) 
{
+                        promise.complete(start);
+                    } else if (comparePositionAndMessageId(p, 
middleEntry.join()) < 0) {
+                        findStartPointLoop(p, start, midpoint, promise, cache);
+                    } else if (comparePositionAndMessageId(p, endEntry.join()) 
< 0) {
+                        findStartPointLoop(p, midpoint + 1, end, promise, 
cache);
+                    } else {
+                        promise.complete(NEWER_THAN_COMPACTED);
+                    }
+                }).exceptionally((exception) -> {
+                        promise.completeExceptionally(exception);
+                        return null;
+                    });
+    }
+
+    static AsyncLoadingCache<Long,MessageIdData> createCache(LedgerHandle lh,
+                                                             long maxSize) {
+        return Caffeine.newBuilder()
+            .maximumSize(maxSize)
+            .buildAsync((entryId, executor) -> readOneMessageId(lh, entryId));
+    }
+
+
+    private static CompletableFuture<MessageIdData> 
readOneMessageId(LedgerHandle lh, long entryId) {
+        CompletableFuture<MessageIdData> promise = new CompletableFuture<>();
+
+        lh.asyncReadEntries(entryId, entryId,
+                            (rc, _lh, seq, ctx) -> {
+                                if (rc != BKException.Code.OK) {
+                                    
promise.completeExceptionally(BKException.create(rc));
+                                } else {
+                                    try (RawMessage m = 
RawMessageImpl.deserializeFrom(
+                                                 
seq.nextElement().getEntryBuffer())) {
+                                        promise.complete(m.getMessageIdData());
+                                    } catch (NoSuchElementException e) {
+                                        log.error("No such entry {} in ledger 
{}", entryId, lh.getId());
+                                        promise.completeExceptionally(e);
+                                    }
+                                }
+                            }, null);
+        return promise;
+    }
+
+    private static int comparePositionAndMessageId(PositionImpl p, 
MessageIdData m) {
+        return ComparisonChain.start()
+            .compare(p.getLedgerId(), m.getLedgerId())
+            .compare(p.getEntryId(), m.getEntryId()).result();
+    }
+    private static final Logger log = 
LoggerFactory.getLogger(CompactedTopicImpl.class);
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
new file mode 100644
index 000000000..04d4bbc7c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import 
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawMessageImpl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
+    private static final Logger log = 
LoggerFactory.getLogger(CompactedTopicTest.class);
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        admin.clusters().createCluster("use",
+                new ClusterData("http://127.0.0.1:"; + BROKER_WEBSERVICE_PORT));
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    /**
+     * Build a compacted ledger, and return the id of the ledger, the position 
of the different
+     * entries in the ledger, and a list of gaps, and the entry which should 
be returned after the gap.
+     */
+    private Triple<Long, List<Pair<MessageIdData,Long>>, 
List<Pair<MessageIdData,Long>>>
+        buildCompactedLedger(BookKeeper bk, int seed, int count)
+            throws Exception {
+        Random r = new Random(seed);
+        LedgerHandle lh = bk.createLedger(1, 1,
+                                          
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                                          
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+        List<Pair<MessageIdData,Long>> positions = new ArrayList<>();
+        List<Pair<MessageIdData,Long>> idsInGaps = new ArrayList<>();
+        ByteBuf emptyBuffer = Unpooled.buffer(0);
+
+        AtomicLong ledgerIds = new AtomicLong(10L);
+        AtomicLong entryIds = new AtomicLong(0L);
+        CompletableFuture.allOf(
+                IntStream.range(0, count).mapToObj((i) -> {
+                        List<MessageIdData> idsInGap = new 
ArrayList<MessageIdData>();
+                        if (r.nextInt(10) == 1) {
+                            long delta = r.nextInt(10) + 1;
+                            idsInGap.add(MessageIdData.newBuilder()
+                                         .setLedgerId(ledgerIds.get())
+                                         .setEntryId(entryIds.get() + 1)
+                                         .build());
+                            ledgerIds.addAndGet(delta);
+                            entryIds.set(0);
+                        }
+                        long delta = r.nextInt(5);
+                        if (delta != 0) {
+                            idsInGap.add(MessageIdData.newBuilder()
+                                         .setLedgerId(ledgerIds.get())
+                                         .setEntryId(entryIds.get() + 1)
+                                         .build());
+                        }
+                        MessageIdData id = MessageIdData.newBuilder()
+                            .setLedgerId(ledgerIds.get())
+                            .setEntryId(entryIds.addAndGet(delta + 1)).build();
+                        RawMessage m = new RawMessageImpl(id, emptyBuffer);
+
+                        CompletableFuture<Void> f = new CompletableFuture<>();
+                        lh.asyncAddEntry(m.serialize(),
+                                (rc, ledger, eid, ctx) -> {
+                                     if (rc != BKException.Code.OK) {
+                                         
f.completeExceptionally(BKException.create(rc));
+                                     } else {
+                                         positions.add(Pair.of(id, eid));
+                                         idsInGap.forEach((gid) -> 
idsInGaps.add(Pair.of(gid, eid)));
+                                         f.complete(null);
+                                     }
+                                }, null);
+                        return f;
+                    }).toArray(CompletableFuture[]::new)).get();
+        lh.close();
+
+        return Triple.of(lh.getId(), positions, idsInGaps);
+    }
+
+    @Test
+    public void testEntryLookup() throws Exception {
+        BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
+                this.conf, null);
+
+        Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, 
Long>>> compactedLedgerData
+            = buildCompactedLedger(bk, 0, 500);
+
+        List<Pair<MessageIdData, Long>> positions = 
compactedLedgerData.getMiddle();
+        List<Pair<MessageIdData, Long>> idsInGaps = 
compactedLedgerData.getRight();
+
+        LedgerHandle lh = bk.openLedger(compactedLedgerData.getLeft(),
+                                        
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                                        
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+        long lastEntryId = lh.getLastAddConfirmed();
+        AsyncLoadingCache<Long,MessageIdData> cache = 
CompactedTopicImpl.createCache(lh, 50);
+
+        MessageIdData firstPositionId = positions.get(0).getLeft();
+        Pair<MessageIdData, Long> lastPosition = 
positions.get(positions.size() - 1);
+
+        // check ids before and after ids in compacted ledger
+        Assert.assertEquals(CompactedTopicImpl.findStartPoint(new 
PositionImpl(0, 0), lastEntryId, cache).get(),
+                            Long.valueOf(0));
+        Assert.assertEquals(CompactedTopicImpl.findStartPoint(new 
PositionImpl(Long.MAX_VALUE, 0),
+                                                              lastEntryId, 
cache).get(),
+                            
Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
+
+        // entry 0 is never in compacted ledger due to how we generate dummy
+        Assert.assertEquals(CompactedTopicImpl.findStartPoint(new 
PositionImpl(firstPositionId.getLedgerId(), 0),
+                                                              lastEntryId, 
cache).get(),
+                            Long.valueOf(0));
+        // check next id after last id in compacted ledger
+        Assert.assertEquals(CompactedTopicImpl.findStartPoint(new 
PositionImpl(lastPosition.getLeft().getLedgerId(),
+                                                                               
lastPosition.getLeft().getEntryId() + 1),
+                                                              lastEntryId, 
cache).get(),
+                            
Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
+
+        // shuffle to make cache work hard
+        Collections.shuffle(positions);
+        Collections.shuffle(idsInGaps);
+
+        // Check ids we know are in compacted ledger
+        for (Pair<MessageIdData, Long> p : positions) {
+            PositionImpl pos = new PositionImpl(p.getLeft().getLedgerId(), 
p.getLeft().getEntryId());
+            if (p.equals(lastPosition)) {
+                Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, 
lastEntryId, cache).get(),
+                                    
Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
+            } else {
+                Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, 
lastEntryId, cache).get(),
+                            Long.valueOf(p.getRight() + 1));
+            }
+        }
+
+        // Check ids we know are in the gaps of the compacted ledger
+        for (Pair<MessageIdData, Long> gap : idsInGaps) {
+            PositionImpl pos = new PositionImpl(gap.getLeft().getLedgerId(), 
gap.getLeft().getEntryId());
+            Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, 
lastEntryId, cache).get(),
+                                Long.valueOf(gap.getRight()));
+        }
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to