This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d04a42  Read from compacted topic ledger if available and enabled 
(#1231)
8d04a42 is described below

commit 8d04a42e4cb0e862f0346ead98c228b881cfef53
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Mon Feb 19 18:12:00 2018 +0100

    Read from compacted topic ledger if available and enabled (#1231)
    
    * Read from compacted topic ledger if available and enabled
    
    If a topic has been compacted and the client has enabled reads from
    compacted topics, try to read from the compacted ledger if the cursor
    position lands before or within the range of message IDs in the
    compacted topic ledger. If the cursor position lands after the message
    IDs, in the compacted topic ledger, read from the cursor as normal.
    
    * fixup mocks in tests
---
 .../apache/bookkeeper/mledger/impl/EntryImpl.java  |   2 +-
 .../pulsar/broker/ManagedLedgerClientFactory.java  |   4 +
 .../org/apache/pulsar/broker/PulsarService.java    |   5 +
 .../PersistentDispatcherSingleActiveConsumer.java  |   6 +-
 .../broker/service/persistent/PersistentTopic.java |   4 +-
 .../apache/pulsar/compaction/CompactedTopic.java   |   4 +
 .../pulsar/compaction/CompactedTopicImpl.java      | 119 +++++++-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   6 +-
 .../PersistentDispatcherFailoverConsumerTest.java  |   2 +
 .../pulsar/broker/service/PersistentTopicTest.java |   2 +
 .../pulsar/broker/service/ServerCnxTest.java       |   2 +
 .../apache/pulsar/compaction/CompactionTest.java   | 309 +++++++++++++++++++++
 12 files changed, 457 insertions(+), 8 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index 7371ebc..d1c6def 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -28,7 +28,7 @@ import io.netty.util.ReferenceCounted;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.mledger.Entry;
 
-final class EntryImpl extends AbstractReferenceCounted implements Entry, 
Comparable<EntryImpl>, ReferenceCounted {
+public final class EntryImpl extends AbstractReferenceCounted implements 
Entry, Comparable<EntryImpl>, ReferenceCounted {
 
     private static final Recycler<EntryImpl> RECYCLER = new 
Recycler<EntryImpl>() {
         @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index d51ea04..7eeb949 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -52,6 +52,10 @@ public class ManagedLedgerClientFactory implements Closeable 
{
         return managedLedgerFactory;
     }
 
+    public BookKeeper getBookKeeperClient() {
+        return bkClient;
+    }
+
     public void close() throws IOException {
         try {
             managedLedgerFactory.shutdown();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 9a2b44a..dee8b5c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
 
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
@@ -542,6 +543,10 @@ public class PulsarService implements AutoCloseable {
         return this.brokerService;
     }
 
+    public BookKeeper getBookKeeperClient() {
+        return managedLedgerClientFactory.getBookKeeperClient();
+    }
+
     public ManagedLedgerFactory getManagedLedgerFactory() {
         return managedLedgerClientFactory.getManagedLedgerFactory();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 40678f8..a26d862 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -319,7 +319,11 @@ public final class 
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
                 log.debug("[{}-{}] Schedule read of {} messages", name, 
consumer, messagesToRead);
             }
             havePendingRead = true;
-            cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer);
+            if (consumer.readCompacted()) {
+                topic.compactedTopic.asyncReadEntriesOrWait(cursor, 
messagesToRead, this, consumer);
+            } else {
+                cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer);
+            }
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Consumer buffer is full, pause reading", 
name, consumer);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5c82ef3..0ce3925 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -159,7 +159,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
 
     private final MessageDeduplication messageDeduplication;
-    private final CompactedTopic compactedTopic;
+    final CompactedTopic compactedTopic;
 
     // Whether messages published must be encrypted or not in this topic
     private volatile boolean isEncryptionRequired = false;
@@ -207,7 +207,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
 
         this.dispatchRateLimiter = new DispatchRateLimiter(this);
 
-        this.compactedTopic = new CompactedTopicImpl();
+        this.compactedTopic = new 
CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
 
         for (ManagedCursor cursor : ledger.getCursors()) {
             if (cursor.getName().startsWith(replicatorPrefix)) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 284cdbf..65e78a9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -18,8 +18,12 @@
  */
 package org.apache.pulsar.compaction;
 
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 
 public interface CompactedTopic {
     void newCompactedLedger(Position p, long compactedLedgerId);
+    void asyncReadEntriesOrWait(ManagedCursor cursor, int 
numberOfEntriesToRead,
+                                ReadEntriesCallback callback, Object ctx);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 0afccd1..f36e11f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -22,28 +22,87 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.collect.ComparisonChain;
 
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CompactedTopicImpl implements CompactedTopic {
     final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
+    final static int DEFAULT_STARTPOINT_CACHE_SIZE = 100;
+
+    private final BookKeeper bk;
+
+    private PositionImpl compactionHorizon = null;
+    private CompletableFuture<CompactedTopicContext> compactedTopicContext = 
null;
+
+    public CompactedTopicImpl(BookKeeper bk) {
+        this.bk = bk;
+    }
+
+    @Override
+    public void newCompactedLedger(Position p, long compactedLedgerId) {
+        synchronized (this) {
+            compactionHorizon = (PositionImpl)p;
+            compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);
+        }
+    }
 
     @Override
-    public void newCompactedLedger(Position p, long compactedLedgerId) {}
+    public void asyncReadEntriesOrWait(ManagedCursor cursor, int 
numberOfEntriesToRead,
+                                       ReadEntriesCallback callback, Object 
ctx) {
+        synchronized (this) {
+            PositionImpl cursorPosition = (PositionImpl) 
cursor.getReadPosition();
+            if (compactionHorizon == null
+                || compactionHorizon.compareTo(cursorPosition) < 0) {
+                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, 
ctx);
+            } else {
+                compactedTopicContext.thenCompose(
+                        (context) -> {
+                            return findStartPoint(cursorPosition, 
context.ledger.getLastAddConfirmed(), context.cache)
+                                .thenCompose((startPoint) -> {
+                                        if (startPoint == 
NEWER_THAN_COMPACTED) {
+                                            
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx);
+                                            return 
CompletableFuture.completedFuture(null);
+                                        } else {
+                                            long endPoint = 
Math.min(context.ledger.getLastAddConfirmed(),
+                                                                     
startPoint + numberOfEntriesToRead);
+                                            return readEntries(context.ledger, 
startPoint, endPoint)
+                                                .thenAccept((entries) -> {
+                                                        Entry lastEntry = 
entries.get(entries.size() - 1);
+                                                        
cursor.seek(lastEntry.getPosition().getNext());
+                                                        
callback.readEntriesComplete(entries, ctx);
+                                                    });
+                                        }
+                                    });
+                                })
+                    .exceptionally((exception) -> {
+                            callback.readEntriesFailed(new 
ManagedLedgerException(exception), ctx);
+                            return null;
+                        });
+            }
+        }
+    }
 
     static CompletableFuture<Long> findStartPoint(PositionImpl p,
                                                   long lastEntryId,
@@ -107,6 +166,60 @@ public class CompactedTopicImpl implements CompactedTopic {
         return promise;
     }
 
+    private static CompletableFuture<CompactedTopicContext> 
openCompactedLedger(BookKeeper bk, long id) {
+        CompletableFuture<LedgerHandle> promise = new CompletableFuture<>();
+        bk.asyncOpenLedger(id,
+                           Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                           Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
+                           (rc, ledger, ctx) -> {
+                               if (rc != BKException.Code.OK) {
+                                   
promise.completeExceptionally(BKException.create(rc));
+                               } else {
+                                   promise.complete(ledger);
+                               }
+                           }, null);
+        return promise.thenApply((ledger) -> new CompactedTopicContext(
+                                         ledger, createCache(ledger, 
DEFAULT_STARTPOINT_CACHE_SIZE)));
+    }
+
+    private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, 
long from, long to) {
+        CompletableFuture<Enumeration<LedgerEntry>> promise = new 
CompletableFuture<>();
+
+        lh.asyncReadEntries(from, to,
+                            (rc, _lh, seq, ctx) -> {
+                                if (rc != BKException.Code.OK) {
+                                    
promise.completeExceptionally(BKException.create(rc));
+                                } else {
+                                    promise.complete(seq);
+                                }
+                            }, null);
+        return promise.thenApply(
+                (seq) -> {
+                    List<Entry> entries = new ArrayList<Entry>();
+                    while (seq.hasMoreElements()) {
+                        ByteBuf buf = seq.nextElement().getEntryBuffer();
+                        try (RawMessage m = 
RawMessageImpl.deserializeFrom(buf)) {
+                            
entries.add(EntryImpl.create(m.getMessageIdData().getLedgerId(),
+                                                         
m.getMessageIdData().getEntryId(),
+                                                         
m.getHeadersAndPayload()));
+                        } finally {
+                            buf.release();
+                        }
+                    }
+                    return entries;
+                });
+    }
+
+    static class CompactedTopicContext {
+        final LedgerHandle ledger;
+        final AsyncLoadingCache<Long,MessageIdData> cache;
+
+        CompactedTopicContext(LedgerHandle ledger, 
AsyncLoadingCache<Long,MessageIdData> cache) {
+            this.ledger = ledger;
+            this.cache = cache;
+        }
+    }
+
     private static int comparePositionAndMessageId(PositionImpl p, 
MessageIdData m) {
         return ComparisonChain.start()
             .compare(p.getLedgerId(), m.getLedgerId())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index a185a7e..af92e99 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -125,7 +125,7 @@ public abstract class MockedPulsarServiceBaseTest {
 
     protected final void init() throws Exception {
         mockZookKeeper = createMockZooKeeper();
-        mockBookKeeper = new NonClosableMockBookKeeper(new 
ClientConfiguration(), mockZookKeeper);
+        mockBookKeeper = createMockBookKeeper(mockZookKeeper);
 
         sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
 
@@ -208,6 +208,10 @@ public abstract class MockedPulsarServiceBaseTest {
         return zk;
     }
 
+    public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper 
zookeeper) throws Exception {
+        return new NonClosableMockBookKeeper(new ClientConfiguration(), 
zookeeper);
+    }
+
     // Prevent the MockBookKeeper instance from being closed when the broker 
is restarted within a test
     private static class NonClosableMockBookKeeper extends MockBookKeeper {
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 922b35b..2426997 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
@@ -111,6 +112,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
+        
doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();
 
         configCacheService = mock(ConfigurationCacheService.class);
         @SuppressWarnings("unchecked")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index a8ec940..5a95800 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.mockito.Matchers.any;
@@ -142,6 +143,7 @@ public class PersistentTopicTest {
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
+        
doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();
 
         configCacheService = mock(ConfigurationCacheService.class);
         @SuppressWarnings("unchecked")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 0a5c154..6c82e68 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.mockito.Matchers.any;
@@ -153,6 +154,7 @@ public class ServerCnxTest {
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
+        
doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();
 
         configCacheService = mock(ConfigurationCacheService.class);
         @SuppressWarnings("unchecked")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
new file mode 100644
index 0000000..3001841
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class CompactionTest extends MockedPulsarServiceBaseTest {
+    private static final Logger log = 
LoggerFactory.getLogger(CompactionTest.class);
+
+    private ScheduledExecutorService compactionScheduler;
+    private BookKeeper bk;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        admin.clusters().createCluster("use",
+                new ClusterData("http://127.0.0.1:"; + BROKER_WEBSERVICE_PORT));
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+
+        compactionScheduler = Executors.newSingleThreadScheduledExecutor(
+                new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
+        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null);
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+
+        compactionScheduler.shutdownNow();
+    }
+
+    @Test
+    public void testCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        final int numMessages = 20;
+        final int maxKeys = 10;
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+
+        Map<String, byte[]> expected = new HashMap<>();
+        List<Pair<String,byte[]>> all = new ArrayList<>();
+        Random r = new Random(0);
+
+        ConsumerConfiguration consumerConf = new 
ConsumerConfiguration().setReadCompacted(true);
+        pulsarClient.subscribe(topic, "sub1", consumerConf).close();
+
+        for (int j = 0; j < numMessages; j++) {
+            int keyIndex = r.nextInt(maxKeys);
+            String key = "key"+keyIndex;
+            byte[] data = ("my-message-" + key + "-" + j).getBytes();
+            producer.send(MessageBuilder.create()
+                          .setKey(key)
+                          .setContent(data).build());
+            expected.put(key, data);
+            all.add(Pair.of(key, data));
+        }
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", 
consumerConf)) {
+            while (true) {
+                Message m = consumer.receive(2, TimeUnit.SECONDS);
+                Assert.assertEquals(expected.remove(m.getKey()), m.getData());
+                if (expected.isEmpty()) {
+                    break;
+                }
+            }
+            Assert.assertTrue(expected.isEmpty());
+        }
+
+        // can get full backlog if read compacted disabled
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", 
consumerConf.setReadCompacted(false))) {
+            while (true) {
+                Message m = consumer.receive(2, TimeUnit.SECONDS);
+                Pair<String,byte[]> expectedMessage = all.remove(0);
+                Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
+                Assert.assertEquals(expectedMessage.getRight(), m.getData());
+                if (all.isEmpty()) {
+                    break;
+                }
+            }
+            Assert.assertTrue(all.isEmpty());
+        }
+    }
+
+    @Test
+    public void testReadCompactedBeforeCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+
+        ConsumerConfiguration consumerConf = new 
ConsumerConfiguration().setReadCompacted(true);
+        pulsarClient.subscribe(topic, "sub1", consumerConf).close();
+
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build());
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", 
consumerConf)) {
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content0".getBytes());
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content1".getBytes());
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+        }
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", 
consumerConf)) {
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+        }
+    }
+
+    @Test
+    public void testReadEntriesAfterCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+
+        ConsumerConfiguration consumerConf = new 
ConsumerConfiguration().setReadCompacted(true);
+        pulsarClient.subscribe(topic, "sub1", consumerConf).close();
+
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build());
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content3".getBytes()).build());
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", 
consumerConf)) {
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content3".getBytes());
+        }
+    }
+
+    @Test
+    public void testSeekEarliestAfterCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+
+        ConsumerConfiguration consumerConf = new 
ConsumerConfiguration().setReadCompacted(true);
+
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build());
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", 
consumerConf)) {
+            consumer.seek(MessageId.earliest);
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+        }
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", 
consumerConf.setReadCompacted(false))) {
+            consumer.seek(MessageId.earliest);
+
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content0".getBytes());
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content1".getBytes());
+
+            m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+        }
+    }
+
+    @Test
+    public void testBrokerRestartAfterCompaction() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+
+        ConsumerConfiguration consumerConf = new 
ConsumerConfiguration().setReadCompacted(true);
+        pulsarClient.subscribe(topic, "sub1", consumerConf).close();
+
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build());
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", 
consumerConf)) {
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+        }
+
+        stopBroker();
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", 
consumerConf)) {
+            consumer.receive();
+            Assert.fail("Shouldn't have been able to receive anything");
+        } catch (PulsarClientException e) {
+            // correct behaviour
+        }
+        startBroker();
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", 
consumerConf)) {
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content2".getBytes());
+        }
+    }
+
+    @Test
+    public void testCompactEmptyTopic() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer = pulsarClient.createProducer(topic, producerConf);
+
+        ConsumerConfiguration consumerConf = new 
ConsumerConfiguration().setReadCompacted(true);
+        pulsarClient.subscribe(topic, "sub1", consumerConf).close();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+
+        
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
+
+        try (Consumer consumer = pulsarClient.subscribe(topic, "sub1", 
consumerConf)) {
+            Message m = consumer.receive();
+            Assert.assertEquals(m.getKey(), "key0");
+            Assert.assertEquals(m.getData(), "content0".getBytes());
+        }
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to