merlimat closed pull request #1205: Algorithm to find start point of compacted ledger URL: https://github.com/apache/incubator-pulsar/pull/1205
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java index d1b6e14bc..d36520cfb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java @@ -38,7 +38,7 @@ private final MessageIdData id; private final ByteBuf headersAndPayload; - RawMessageImpl(MessageIdData id, ByteBuf headersAndPayload) { + public RawMessageImpl(MessageIdData id, ByteBuf headersAndPayload) { this.id = id; this.headersAndPayload = headersAndPayload.retainedSlice(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 068ab4972..0afccd10b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -18,10 +18,100 @@ */ package org.apache.pulsar.compaction; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.collect.ComparisonChain; + +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.impl.RawMessageImpl; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CompactedTopicImpl implements CompactedTopic { + final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL; + @Override public void newCompactedLedger(Position p, long compactedLedgerId) {} + + static CompletableFuture<Long> findStartPoint(PositionImpl p, + long lastEntryId, + AsyncLoadingCache<Long,MessageIdData> cache) { + CompletableFuture<Long> promise = new CompletableFuture<>(); + findStartPointLoop(p, 0, lastEntryId, promise, cache); + return promise; + } + + private static void findStartPointLoop(PositionImpl p, long start, long end, + CompletableFuture<Long> promise, + AsyncLoadingCache<Long,MessageIdData> cache) { + long midpoint = start + ((end - start) / 2); + + CompletableFuture<MessageIdData> startEntry = cache.get(start); + CompletableFuture<MessageIdData> middleEntry = cache.get(midpoint); + CompletableFuture<MessageIdData> endEntry = cache.get(end); + + CompletableFuture.allOf(startEntry, middleEntry, endEntry).thenRun( + () -> { + if (comparePositionAndMessageId(p, startEntry.join()) < 0) { + promise.complete(start); + } else if (comparePositionAndMessageId(p, middleEntry.join()) < 0) { + findStartPointLoop(p, start, midpoint, promise, cache); + } else if (comparePositionAndMessageId(p, endEntry.join()) < 0) { + findStartPointLoop(p, midpoint + 1, end, promise, cache); + } else { + promise.complete(NEWER_THAN_COMPACTED); + } + }).exceptionally((exception) -> { + promise.completeExceptionally(exception); + return null; + }); + } + + static AsyncLoadingCache<Long,MessageIdData> createCache(LedgerHandle lh, + long maxSize) { + return Caffeine.newBuilder() + .maximumSize(maxSize) + .buildAsync((entryId, executor) -> readOneMessageId(lh, entryId)); + } + + + private static CompletableFuture<MessageIdData> readOneMessageId(LedgerHandle lh, long entryId) { + CompletableFuture<MessageIdData> promise = new CompletableFuture<>(); + + lh.asyncReadEntries(entryId, entryId, + (rc, _lh, seq, ctx) -> { + if (rc != BKException.Code.OK) { + promise.completeExceptionally(BKException.create(rc)); + } else { + try (RawMessage m = RawMessageImpl.deserializeFrom( + seq.nextElement().getEntryBuffer())) { + promise.complete(m.getMessageIdData()); + } catch (NoSuchElementException e) { + log.error("No such entry {} in ledger {}", entryId, lh.getId()); + promise.completeExceptionally(e); + } + } + }, null); + return promise; + } + + private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) { + return ComparisonChain.start() + .compare(p.getLedgerId(), m.getLedgerId()) + .compare(p.getEntryId(), m.getEntryId()).result(); + } + private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java new file mode 100644 index 000000000..04d4bbc7c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import static org.apache.pulsar.client.impl.RawReaderTest.extractKey; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PropertyAdmin; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.impl.RawMessageImpl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class CompactedTopicTest extends MockedPulsarServiceBaseTest { + private static final Logger log = LoggerFactory.getLogger(CompactedTopicTest.class); + + @BeforeMethod + @Override + public void setup() throws Exception { + super.internalSetup(); + + admin.clusters().createCluster("use", + new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT)); + admin.properties().createProperty("my-property", + new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); + admin.namespaces().createNamespace("my-property/use/my-ns"); + } + + @AfterMethod + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + /** + * Build a compacted ledger, and return the id of the ledger, the position of the different + * entries in the ledger, and a list of gaps, and the entry which should be returned after the gap. + */ + private Triple<Long, List<Pair<MessageIdData,Long>>, List<Pair<MessageIdData,Long>>> + buildCompactedLedger(BookKeeper bk, int seed, int count) + throws Exception { + Random r = new Random(seed); + LedgerHandle lh = bk.createLedger(1, 1, + Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, + Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD); + List<Pair<MessageIdData,Long>> positions = new ArrayList<>(); + List<Pair<MessageIdData,Long>> idsInGaps = new ArrayList<>(); + ByteBuf emptyBuffer = Unpooled.buffer(0); + + AtomicLong ledgerIds = new AtomicLong(10L); + AtomicLong entryIds = new AtomicLong(0L); + CompletableFuture.allOf( + IntStream.range(0, count).mapToObj((i) -> { + List<MessageIdData> idsInGap = new ArrayList<MessageIdData>(); + if (r.nextInt(10) == 1) { + long delta = r.nextInt(10) + 1; + idsInGap.add(MessageIdData.newBuilder() + .setLedgerId(ledgerIds.get()) + .setEntryId(entryIds.get() + 1) + .build()); + ledgerIds.addAndGet(delta); + entryIds.set(0); + } + long delta = r.nextInt(5); + if (delta != 0) { + idsInGap.add(MessageIdData.newBuilder() + .setLedgerId(ledgerIds.get()) + .setEntryId(entryIds.get() + 1) + .build()); + } + MessageIdData id = MessageIdData.newBuilder() + .setLedgerId(ledgerIds.get()) + .setEntryId(entryIds.addAndGet(delta + 1)).build(); + RawMessage m = new RawMessageImpl(id, emptyBuffer); + + CompletableFuture<Void> f = new CompletableFuture<>(); + lh.asyncAddEntry(m.serialize(), + (rc, ledger, eid, ctx) -> { + if (rc != BKException.Code.OK) { + f.completeExceptionally(BKException.create(rc)); + } else { + positions.add(Pair.of(id, eid)); + idsInGap.forEach((gid) -> idsInGaps.add(Pair.of(gid, eid))); + f.complete(null); + } + }, null); + return f; + }).toArray(CompletableFuture[]::new)).get(); + lh.close(); + + return Triple.of(lh.getId(), positions, idsInGaps); + } + + @Test + public void testEntryLookup() throws Exception { + BookKeeper bk = pulsar.getBookKeeperClientFactory().create( + this.conf, null); + + Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> compactedLedgerData + = buildCompactedLedger(bk, 0, 500); + + List<Pair<MessageIdData, Long>> positions = compactedLedgerData.getMiddle(); + List<Pair<MessageIdData, Long>> idsInGaps = compactedLedgerData.getRight(); + + LedgerHandle lh = bk.openLedger(compactedLedgerData.getLeft(), + Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, + Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD); + long lastEntryId = lh.getLastAddConfirmed(); + AsyncLoadingCache<Long,MessageIdData> cache = CompactedTopicImpl.createCache(lh, 50); + + MessageIdData firstPositionId = positions.get(0).getLeft(); + Pair<MessageIdData, Long> lastPosition = positions.get(positions.size() - 1); + + // check ids before and after ids in compacted ledger + Assert.assertEquals(CompactedTopicImpl.findStartPoint(new PositionImpl(0, 0), lastEntryId, cache).get(), + Long.valueOf(0)); + Assert.assertEquals(CompactedTopicImpl.findStartPoint(new PositionImpl(Long.MAX_VALUE, 0), + lastEntryId, cache).get(), + Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED)); + + // entry 0 is never in compacted ledger due to how we generate dummy + Assert.assertEquals(CompactedTopicImpl.findStartPoint(new PositionImpl(firstPositionId.getLedgerId(), 0), + lastEntryId, cache).get(), + Long.valueOf(0)); + // check next id after last id in compacted ledger + Assert.assertEquals(CompactedTopicImpl.findStartPoint(new PositionImpl(lastPosition.getLeft().getLedgerId(), + lastPosition.getLeft().getEntryId() + 1), + lastEntryId, cache).get(), + Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED)); + + // shuffle to make cache work hard + Collections.shuffle(positions); + Collections.shuffle(idsInGaps); + + // Check ids we know are in compacted ledger + for (Pair<MessageIdData, Long> p : positions) { + PositionImpl pos = new PositionImpl(p.getLeft().getLedgerId(), p.getLeft().getEntryId()); + if (p.equals(lastPosition)) { + Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, lastEntryId, cache).get(), + Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED)); + } else { + Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, lastEntryId, cache).get(), + Long.valueOf(p.getRight() + 1)); + } + } + + // Check ids we know are in the gaps of the compacted ledger + for (Pair<MessageIdData, Long> gap : idsInGaps) { + PositionImpl pos = new PositionImpl(gap.getLeft().getLedgerId(), gap.getLeft().getEntryId()); + Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, lastEntryId, cache).get(), + Long.valueOf(gap.getRight())); + } + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services