This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8d04a42 Read from compacted topic ledger if available and enabled (#1231) 8d04a42 is described below commit 8d04a42e4cb0e862f0346ead98c228b881cfef53 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Mon Feb 19 18:12:00 2018 +0100 Read from compacted topic ledger if available and enabled (#1231) * Read from compacted topic ledger if available and enabled If a topic has been compacted and the client has enabled reads from compacted topics, try to read from the compacted ledger if the cursor position lands before or within the range of message IDs in the compacted topic ledger. If the cursor position lands after the message IDs, in the compacted topic ledger, read from the cursor as normal. * fixup mocks in tests --- .../apache/bookkeeper/mledger/impl/EntryImpl.java | 2 +- .../pulsar/broker/ManagedLedgerClientFactory.java | 4 + .../org/apache/pulsar/broker/PulsarService.java | 5 + .../PersistentDispatcherSingleActiveConsumer.java | 6 +- .../broker/service/persistent/PersistentTopic.java | 4 +- .../apache/pulsar/compaction/CompactedTopic.java | 4 + .../pulsar/compaction/CompactedTopicImpl.java | 119 +++++++- .../broker/auth/MockedPulsarServiceBaseTest.java | 6 +- .../PersistentDispatcherFailoverConsumerTest.java | 2 + .../pulsar/broker/service/PersistentTopicTest.java | 2 + .../pulsar/broker/service/ServerCnxTest.java | 2 + .../apache/pulsar/compaction/CompactionTest.java | 309 +++++++++++++++++++++ 12 files changed, 457 insertions(+), 8 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 7371ebc..d1c6def 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -28,7 +28,7 @@ import io.netty.util.ReferenceCounted; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.mledger.Entry; -final class EntryImpl extends AbstractReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted { +public final class EntryImpl extends AbstractReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted { private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() { @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index d51ea04..7eeb949 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -52,6 +52,10 @@ public class ManagedLedgerClientFactory implements Closeable { return managedLedgerFactory; } + public BookKeeper getBookKeeperClient() { + return bkClient; + } + public void close() throws IOException { try { managedLedgerFactory.shutdown(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 9a2b44a..dee8b5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -33,6 +33,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; +import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; @@ -542,6 +543,10 @@ public class PulsarService implements AutoCloseable { return this.brokerService; } + public BookKeeper getBookKeeperClient() { + return managedLedgerClientFactory.getBookKeeperClient(); + } + public ManagedLedgerFactory getManagedLedgerFactory() { return managedLedgerClientFactory.getManagedLedgerFactory(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 40678f8..a26d862 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -319,7 +319,11 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead); } havePendingRead = true; - cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer); + if (consumer.readCompacted()) { + topic.compactedTopic.asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer); + } else { + cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer); + } } else { if (log.isDebugEnabled()) { log.debug("[{}-{}] Consumer buffer is full, pause reading", name, consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 5c82ef3..0ce3925 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -159,7 +159,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { public static final int MESSAGE_RATE_BACKOFF_MS = 1000; private final MessageDeduplication messageDeduplication; - private final CompactedTopic compactedTopic; + final CompactedTopic compactedTopic; // Whether messages published must be encrypted or not in this topic private volatile boolean isEncryptionRequired = false; @@ -207,7 +207,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { this.dispatchRateLimiter = new DispatchRateLimiter(this); - this.compactedTopic = new CompactedTopicImpl(); + this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient()); for (ManagedCursor cursor : ledger.getCursors()) { if (cursor.getName().startsWith(replicatorPrefix)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java index 284cdbf..65e78a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java @@ -18,8 +18,12 @@ */ package org.apache.pulsar.compaction; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; public interface CompactedTopic { void newCompactedLedger(Position p, long compactedLedgerId); + void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead, + ReadEntriesCallback callback, Object ctx); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 0afccd1..f36e11f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -22,28 +22,87 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.ComparisonChain; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; +import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.impl.RawMessageImpl; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CompactedTopicImpl implements CompactedTopic { final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL; + final static int DEFAULT_STARTPOINT_CACHE_SIZE = 100; + + private final BookKeeper bk; + + private PositionImpl compactionHorizon = null; + private CompletableFuture<CompactedTopicContext> compactedTopicContext = null; + + public CompactedTopicImpl(BookKeeper bk) { + this.bk = bk; + } + + @Override + public void newCompactedLedger(Position p, long compactedLedgerId) { + synchronized (this) { + compactionHorizon = (PositionImpl)p; + compactedTopicContext = openCompactedLedger(bk, compactedLedgerId); + } + } @Override - public void newCompactedLedger(Position p, long compactedLedgerId) {} + public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead, + ReadEntriesCallback callback, Object ctx) { + synchronized (this) { + PositionImpl cursorPosition = (PositionImpl) cursor.getReadPosition(); + if (compactionHorizon == null + || compactionHorizon.compareTo(cursorPosition) < 0) { + cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx); + } else { + compactedTopicContext.thenCompose( + (context) -> { + return findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache) + .thenCompose((startPoint) -> { + if (startPoint == NEWER_THAN_COMPACTED) { + cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx); + return CompletableFuture.completedFuture(null); + } else { + long endPoint = Math.min(context.ledger.getLastAddConfirmed(), + startPoint + numberOfEntriesToRead); + return readEntries(context.ledger, startPoint, endPoint) + .thenAccept((entries) -> { + Entry lastEntry = entries.get(entries.size() - 1); + cursor.seek(lastEntry.getPosition().getNext()); + callback.readEntriesComplete(entries, ctx); + }); + } + }); + }) + .exceptionally((exception) -> { + callback.readEntriesFailed(new ManagedLedgerException(exception), ctx); + return null; + }); + } + } + } static CompletableFuture<Long> findStartPoint(PositionImpl p, long lastEntryId, @@ -107,6 +166,60 @@ public class CompactedTopicImpl implements CompactedTopic { return promise; } + private static CompletableFuture<CompactedTopicContext> openCompactedLedger(BookKeeper bk, long id) { + CompletableFuture<LedgerHandle> promise = new CompletableFuture<>(); + bk.asyncOpenLedger(id, + Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, + Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD, + (rc, ledger, ctx) -> { + if (rc != BKException.Code.OK) { + promise.completeExceptionally(BKException.create(rc)); + } else { + promise.complete(ledger); + } + }, null); + return promise.thenApply((ledger) -> new CompactedTopicContext( + ledger, createCache(ledger, DEFAULT_STARTPOINT_CACHE_SIZE))); + } + + private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long from, long to) { + CompletableFuture<Enumeration<LedgerEntry>> promise = new CompletableFuture<>(); + + lh.asyncReadEntries(from, to, + (rc, _lh, seq, ctx) -> { + if (rc != BKException.Code.OK) { + promise.completeExceptionally(BKException.create(rc)); + } else { + promise.complete(seq); + } + }, null); + return promise.thenApply( + (seq) -> { + List<Entry> entries = new ArrayList<Entry>(); + while (seq.hasMoreElements()) { + ByteBuf buf = seq.nextElement().getEntryBuffer(); + try (RawMessage m = RawMessageImpl.deserializeFrom(buf)) { + entries.add(EntryImpl.create(m.getMessageIdData().getLedgerId(), + m.getMessageIdData().getEntryId(), + m.getHeadersAndPayload())); + } finally { + buf.release(); + } + } + return entries; + }); + } + + static class CompactedTopicContext { + final LedgerHandle ledger; + final AsyncLoadingCache<Long,MessageIdData> cache; + + CompactedTopicContext(LedgerHandle ledger, AsyncLoadingCache<Long,MessageIdData> cache) { + this.ledger = ledger; + this.cache = cache; + } + } + private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) { return ComparisonChain.start() .compare(p.getLedgerId(), m.getLedgerId()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index a185a7e..af92e99 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -125,7 +125,7 @@ public abstract class MockedPulsarServiceBaseTest { protected final void init() throws Exception { mockZookKeeper = createMockZooKeeper(); - mockBookKeeper = new NonClosableMockBookKeeper(new ClientConfiguration(), mockZookKeeper); + mockBookKeeper = createMockBookKeeper(mockZookKeeper); sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor(); @@ -208,6 +208,10 @@ public abstract class MockedPulsarServiceBaseTest { return zk; } + public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper) throws Exception { + return new NonClosableMockBookKeeper(new ClientConfiguration(), zookeeper); + } + // Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test private static class NonClosableMockBookKeeper extends MockBookKeeper { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 922b35b..2426997 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; @@ -111,6 +112,7 @@ public class PersistentDispatcherFailoverConsumerTest { ZooKeeper mockZk = createMockZooKeeper(); doReturn(mockZk).when(pulsar).getZkClient(); + doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient(); configCacheService = mock(ConfigurationCacheService.class); @SuppressWarnings("unchecked") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index a8ec940..5a95800 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.mockito.Matchers.any; @@ -142,6 +143,7 @@ public class PersistentTopicTest { ZooKeeper mockZk = createMockZooKeeper(); doReturn(mockZk).when(pulsar).getZkClient(); + doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient(); configCacheService = mock(ConfigurationCacheService.class); @SuppressWarnings("unchecked") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 0a5c154..6c82e68 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.mockito.Matchers.any; @@ -153,6 +154,7 @@ public class ServerCnxTest { ZooKeeper mockZk = createMockZooKeeper(); doReturn(mockZk).when(pulsar).getZkClient(); + doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient(); configCacheService = mock(ConfigurationCacheService.class); @SuppressWarnings("unchecked") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java new file mode 100644 index 0000000..3001841 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PropertyAdmin; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class CompactionTest extends MockedPulsarServiceBaseTest { + private static final Logger log = LoggerFactory.getLogger(CompactionTest.class); + + private ScheduledExecutorService compactionScheduler; + private BookKeeper bk; + + @BeforeMethod + @Override + public void setup() throws Exception { + super.internalSetup(); + + admin.clusters().createCluster("use", + new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT)); + admin.properties().createProperty("my-property", + new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); + admin.namespaces().createNamespace("my-property/use/my-ns"); + + compactionScheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); + bk = pulsar.getBookKeeperClientFactory().create(this.conf, null); + } + + @AfterMethod + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + + compactionScheduler.shutdownNow(); + } + + @Test + public void testCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + final int numMessages = 20; + final int maxKeys = 10; + + ProducerConfiguration producerConf = new ProducerConfiguration(); + Producer producer = pulsarClient.createProducer(topic, producerConf); + + Map<String, byte[]> expected = new HashMap<>(); + List<Pair<String,byte[]>> all = new ArrayList<>(); + Random r = new Random(0); + + ConsumerConfiguration consumerConf = new ConsumerConfiguration().setReadCompacted(true); + pulsarClient.subscribe(topic, "sub1", consumerConf).close(); + + for (int j = 0; j < numMessages; j++) { + int keyIndex = r.nextInt(maxKeys); + String key = "key"+keyIndex; + byte[] data = ("my-message-" + key + "-" + j).getBytes(); + producer.send(MessageBuilder.create() + .setKey(key) + .setContent(data).build()); + expected.put(key, data); + all.add(Pair.of(key, data)); + } + + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + // consumer with readCompacted enabled only get compacted entries + try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) { + while (true) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + Assert.assertEquals(expected.remove(m.getKey()), m.getData()); + if (expected.isEmpty()) { + break; + } + } + Assert.assertTrue(expected.isEmpty()); + } + + // can get full backlog if read compacted disabled + try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf.setReadCompacted(false))) { + while (true) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + Pair<String,byte[]> expectedMessage = all.remove(0); + Assert.assertEquals(expectedMessage.getLeft(), m.getKey()); + Assert.assertEquals(expectedMessage.getRight(), m.getData()); + if (all.isEmpty()) { + break; + } + } + Assert.assertTrue(all.isEmpty()); + } + } + + @Test + public void testReadCompactedBeforeCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + ProducerConfiguration producerConf = new ProducerConfiguration(); + Producer producer = pulsarClient.createProducer(topic, producerConf); + + ConsumerConfiguration consumerConf = new ConsumerConfiguration().setReadCompacted(true); + pulsarClient.subscribe(topic, "sub1", consumerConf).close(); + + producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build()); + producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build()); + producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build()); + + try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content0".getBytes()); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content1".getBytes()); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content2".getBytes()); + } + + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content2".getBytes()); + } + } + + @Test + public void testReadEntriesAfterCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + ProducerConfiguration producerConf = new ProducerConfiguration(); + Producer producer = pulsarClient.createProducer(topic, producerConf); + + ConsumerConfiguration consumerConf = new ConsumerConfiguration().setReadCompacted(true); + pulsarClient.subscribe(topic, "sub1", consumerConf).close(); + + producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build()); + producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build()); + producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build()); + + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + producer.send(MessageBuilder.create().setKey("key0").setContent("content3".getBytes()).build()); + + try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content2".getBytes()); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content3".getBytes()); + } + } + + @Test + public void testSeekEarliestAfterCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + ProducerConfiguration producerConf = new ProducerConfiguration(); + Producer producer = pulsarClient.createProducer(topic, producerConf); + + ConsumerConfiguration consumerConf = new ConsumerConfiguration().setReadCompacted(true); + + producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build()); + producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build()); + producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build()); + + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) { + consumer.seek(MessageId.earliest); + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content2".getBytes()); + } + + try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf.setReadCompacted(false))) { + consumer.seek(MessageId.earliest); + + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content0".getBytes()); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content1".getBytes()); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content2".getBytes()); + } + } + + @Test + public void testBrokerRestartAfterCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + ProducerConfiguration producerConf = new ProducerConfiguration(); + Producer producer = pulsarClient.createProducer(topic, producerConf); + + ConsumerConfiguration consumerConf = new ConsumerConfiguration().setReadCompacted(true); + pulsarClient.subscribe(topic, "sub1", consumerConf).close(); + + producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build()); + producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build()); + producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build()); + + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content2".getBytes()); + } + + stopBroker(); + try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) { + consumer.receive(); + Assert.fail("Shouldn't have been able to receive anything"); + } catch (PulsarClientException e) { + // correct behaviour + } + startBroker(); + + try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content2".getBytes()); + } + } + + @Test + public void testCompactEmptyTopic() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + ProducerConfiguration producerConf = new ProducerConfiguration(); + Producer producer = pulsarClient.createProducer(topic, producerConf); + + ConsumerConfiguration consumerConf = new ConsumerConfiguration().setReadCompacted(true); + pulsarClient.subscribe(topic, "sub1", consumerConf).close(); + + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + + producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build()); + + try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", consumerConf)) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getData(), "content0".getBytes()); + } + } +} -- To stop receiving notification emails like this one, please contact mme...@apache.org.