This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dc404e0  Compression support for compaction (#1604)
dc404e0 is described below

commit dc404e0d7f8fc4812abe2a3aca40c9b77ab31098
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Apr 18 23:47:58 2018 -0700

    Compression support for compaction (#1604)
    
    * Compression support for compaction
    
    Messages in a batch are all compressed together, so compaction has to
    handle them as a special case. This change adds that special handling
    and verifies that compaction works with compaction for batched and
    non-batched messages.
    
    * Remove incorrect comments (came through copy paste)
    
    * Fix refcount issue in test
---
 .../pulsar/client/impl/RawBatchConverter.java      | 37 ++++++++--
 .../apache/pulsar/compaction/CompactionTest.java   | 80 ++++++++++++++++++++++
 2 files changed, 112 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index f83cbf5..37fceda 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -35,8 +35,11 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,13 +64,18 @@ public class RawBatchConverter {
         ByteBuf payload = msg.getHeadersAndPayload();
         MessageMetadata metadata = Commands.parseMessageMetadata(payload);
         int batchSize = metadata.getNumMessagesInBatch();
+
+        CompressionType compressionType = metadata.getCompression();
+        CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);
+        int uncompressedSize = metadata.getUncompressedSize();
+        ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
         metadata.recycle();
 
         List<ImmutablePair<MessageId,String>> idsAndKeys = new ArrayList<>();
 
         for (int i = 0; i < batchSize; i++) {
             SingleMessageMetadata.Builder singleMessageMetadataBuilder = 
SingleMessageMetadata.newBuilder();
-            ByteBuf singleMessagePayload = 
Commands.deSerializeSingleMessageInBatch(payload,
+            ByteBuf singleMessagePayload = 
Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
                                                                                
     singleMessageMetadataBuilder,
                                                                                
     0, batchSize);
             MessageId id = new 
BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
@@ -80,6 +88,7 @@ public class RawBatchConverter {
             singleMessageMetadataBuilder.recycle();
             singleMessagePayload.release();
         }
+        uncompressedPayload.release();
         return idsAndKeys;
     }
 
@@ -98,6 +107,12 @@ public class RawBatchConverter {
         ByteBuf payload = msg.getHeadersAndPayload();
         MessageMetadata metadata = Commands.parseMessageMetadata(payload);
         ByteBuf batchBuffer = 
PooledByteBufAllocator.DEFAULT.buffer(payload.capacity());
+
+        CompressionType compressionType = metadata.getCompression();
+        CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);
+
+        int uncompressedSize = metadata.getUncompressedSize();
+        ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
         try {
             int batchSize = metadata.getNumMessagesInBatch();
             int messagesRetained = 0;
@@ -105,7 +120,7 @@ public class RawBatchConverter {
             SingleMessageMetadata.Builder emptyMetadataBuilder = 
SingleMessageMetadata.newBuilder().setCompactedOut(true);
             for (int i = 0; i < batchSize; i++) {
                 SingleMessageMetadata.Builder singleMessageMetadataBuilder = 
SingleMessageMetadata.newBuilder();
-                ByteBuf singleMessagePayload = 
Commands.deSerializeSingleMessageInBatch(payload,
+                ByteBuf singleMessagePayload = 
Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
                                                                                
         singleMessageMetadataBuilder,
                                                                                
         0, batchSize);
                 MessageId id = new 
BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
@@ -131,10 +146,22 @@ public class RawBatchConverter {
             emptyMetadataBuilder.recycle();
 
             if (messagesRetained > 0) {
+                int newUncompressedSize = batchBuffer.readableBytes();
+                ByteBuf compressedPayload = codec.encode(batchBuffer);
+
+                MessageMetadata.Builder metadataBuilder = metadata.toBuilder();
+                metadataBuilder.setUncompressedSize(newUncompressedSize);
+                MessageMetadata newMetadata = metadataBuilder.build();
+
                 ByteBuf metadataAndPayload = 
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
-                                                                               
   metadata, batchBuffer);
-                return Optional.of(new RawMessageImpl(msg.getMessageIdData(),
-                                                      metadataAndPayload));
+                                                                               
   newMetadata, compressedPayload);
+                Optional<RawMessage> result = Optional.of(new 
RawMessageImpl(msg.getMessageIdData(),
+                                                                             
metadataAndPayload));
+                metadataBuilder.recycle();
+                newMetadata.recycle();
+                metadataAndPayload.release();
+                compressedPayload.release();
+                return result;
             } else {
                 return Optional.empty();
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 43f8bb2..9a774c9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
@@ -729,4 +730,83 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             Assert.assertEquals(new String(message3.getData()), 
"my-message-2");
         }
     }
+
+    @Test
+    public void testCompactCompressedNoBatch() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        // subscribe before sending anything, so that we get all messages
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+            .readCompacted(true).subscribe().close();
+
+        try (Producer producer = pulsarClient.newProducer().topic(topic)
+                
.compressionType(CompressionType.LZ4).enableBatching(false).create()) {
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key1")
+                               .setContent("my-message-1".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2")
+                               .setContent("my-message-2".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2")
+                               
.setContent("my-message-3".getBytes()).build()).get();
+        }
+
+        // compact the topic
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()){
+            Message message1 = consumer.receive();
+            Assert.assertEquals(message1.getKey(), "key1");
+            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+
+            Message message2 = consumer.receive();
+            Assert.assertEquals(message2.getKey(), "key2");
+            Assert.assertEquals(new String(message2.getData()), 
"my-message-3");
+        }
+    }
+
+    @Test
+    public void testCompactCompressedBatching() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        // subscribe before sending anything, so that we get all messages
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+            .readCompacted(true).subscribe().close();
+
+        try (Producer producer = pulsarClient.newProducer().topic(topic)
+                .compressionType(CompressionType.LZ4)
+                .maxPendingMessages(3)
+                .enableBatching(true)
+                .batchingMaxMessages(3)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key1")
+                               .setContent("my-message-1".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2")
+                               .setContent("my-message-2".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2")
+                               
.setContent("my-message-3".getBytes()).build()).get();
+        }
+
+        // compact the topic
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()){
+            Message message1 = consumer.receive();
+            Assert.assertEquals(message1.getKey(), "key1");
+            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+
+            Message message2 = consumer.receive();
+            Assert.assertEquals(message2.getKey(), "key2");
+            Assert.assertEquals(new String(message2.getData()), 
"my-message-3");
+        }
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to