merlimat closed pull request #1540: Compaction sets read compacted flag when 
reading
URL: https://github.com/apache/incubator-pulsar/pull/1540
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 519b3b410c..4a914778fd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -54,6 +54,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, 
String subscription,
         consumerConfiguration.setSubscriptionName(subscription);
         consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
         
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
+        consumerConfiguration.setReadCompacted(true);
 
         consumer = new RawConsumerImpl(client, consumerConfiguration,
                                        consumerFuture);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index d3c2ea14ac..72b8b747f9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -222,11 +222,11 @@ public static MockZooKeeper createMockZooKeeper() throws 
Exception {
     }
 
     public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper 
zookeeper) throws Exception {
-        return new NonClosableMockBookKeeper(new ClientConfiguration(), 
zookeeper);
+        return spy(new NonClosableMockBookKeeper(new ClientConfiguration(), 
zookeeper));
     }
 
     // Prevent the MockBookKeeper instance from being closed when the broker 
is restarted within a test
-    private static class NonClosableMockBookKeeper extends MockBookKeeper {
+    public static class NonClosableMockBookKeeper extends MockBookKeeper {
 
         public NonClosableMockBookKeeper(ClientConfiguration conf, ZooKeeper 
zk) throws Exception {
             super(zk);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 22e74f21a9..8f64ef544d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -18,18 +18,26 @@
  */
 package org.apache.pulsar.compaction;
 
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
@@ -40,16 +48,20 @@
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class CompactionTest extends MockedPulsarServiceBaseTest {
+    private static final Logger log = 
LoggerFactory.getLogger(CompactionTest.class);
+
     private ScheduledExecutorService compactionScheduler;
     private BookKeeper bk;
 
@@ -580,4 +592,102 @@ public void testEmptyPayloadDeletes() throws Exception {
         }
     }
 
+
+    @Test
+    public void testCompactorReadsCompacted() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        // capture opened ledgers
+        Set<Long> ledgersOpened = Sets.newConcurrentHashSet();
+        when(mockBookKeeper.newOpenLedgerOp()).thenAnswer(
+                (invocation) -> {
+                    OpenBuilder builder = 
(OpenBuilder)spy(invocation.callRealMethod());
+                    when(builder.withLedgerId(anyLong())).thenAnswer(
+                            (invocation2) -> {
+                                
ledgersOpened.add((Long)invocation2.getArguments()[0]);
+                                return invocation2.callRealMethod();
+                            });
+                    return builder;
+                });
+
+        // subscribe before sending anything, so that we get all messages in 
sub1
+        
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
+
+        // create the topic on the broker
+        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create()) {
+            producerNormal.send(MessageBuilder.create()
+                                .setKey("key0")
+                                
.setContent("my-message-0".getBytes()).build());
+        }
+
+        // force ledger roll
+        pulsar.getBrokerService().getTopicReference(topic).get().close().get();
+
+        // write a message to avoid issue #1517
+        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create()) {
+            producerNormal.send(MessageBuilder.create()
+                                .setKey("key1")
+                                
.setContent("my-message-1".getBytes()).build());
+        }
+
+        // verify second ledger created
+        String managedLedgerName = 
((PersistentTopic)pulsar.getBrokerService().getTopicReference(topic).get())
+            .getManagedLedger().getName();
+        ManagedLedgerInfo info = 
pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
+        Assert.assertEquals(info.ledgers.size(), 2);
+        Assert.assertTrue(ledgersOpened.isEmpty()); // no ledgers should have 
been opened
+
+        // compact the topic
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        // should have opened all except last to read
+        
Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
+        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
+        ledgersOpened.clear();
+
+        // force broker to close resources for topic
+        pulsar.getBrokerService().getTopicReference(topic).get().close().get();
+
+        // write a message to avoid issue #1517
+        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create()) {
+            producerNormal.send(MessageBuilder.create()
+                                .setKey("key2")
+                                
.setContent("my-message-2".getBytes()).build());
+        }
+
+        info = 
pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
+        Assert.assertEquals(info.ledgers.size(), 3);
+
+        // should only have opened the penultimate ledger to get stat
+        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
+        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
+        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));
+        ledgersOpened.clear();
+
+        // compact the topic again
+        compactor.compact(topic).get();
+
+        // shouldn't have opened first ledger (already compacted), penultimate 
would have some uncompacted data.
+        // last ledger already open for writing
+        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
+        
Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
+        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));
+
+        // all three messages should be there when we read compacted
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()){
+            Message message1 = consumer.receive();
+            Assert.assertEquals(message1.getKey(), "key0");
+            Assert.assertEquals(new String(message1.getData()), 
"my-message-0");
+
+            Message message2 = consumer.receive();
+            Assert.assertEquals(message2.getKey(), "key1");
+            Assert.assertEquals(new String(message2.getData()), 
"my-message-1");
+
+            Message message3 = consumer.receive();
+            Assert.assertEquals(message3.getKey(), "key2");
+            Assert.assertEquals(new String(message3.getData()), 
"my-message-2");
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to