This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new dc404e0 Compression support for compaction (#1604) dc404e0 is described below commit dc404e0d7f8fc4812abe2a3aca40c9b77ab31098 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Wed Apr 18 23:47:58 2018 -0700 Compression support for compaction (#1604) * Compression support for compaction Messages in a batch are all compressed together, so compaction has to handle them as a special case. This change adds that special handling and verifies that compaction works with compaction for batched and non-batched messages. * Remove incorrect comments (came through copy paste) * Fix refcount issue in test --- .../pulsar/client/impl/RawBatchConverter.java | 37 ++++++++-- .../apache/pulsar/compaction/CompactionTest.java | 80 ++++++++++++++++++++++ 2 files changed, 112 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index f83cbf5..37fceda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -35,8 +35,11 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,13 +64,18 @@ public class RawBatchConverter { ByteBuf payload = msg.getHeadersAndPayload(); MessageMetadata metadata = Commands.parseMessageMetadata(payload); int batchSize = metadata.getNumMessagesInBatch(); + + CompressionType compressionType = metadata.getCompression(); + CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); + int uncompressedSize = metadata.getUncompressedSize(); + ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize); metadata.recycle(); List<ImmutablePair<MessageId,String>> idsAndKeys = new ArrayList<>(); for (int i = 0; i < batchSize; i++) { SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata.newBuilder(); - ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload, + ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload, singleMessageMetadataBuilder, 0, batchSize); MessageId id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(), @@ -80,6 +88,7 @@ public class RawBatchConverter { singleMessageMetadataBuilder.recycle(); singleMessagePayload.release(); } + uncompressedPayload.release(); return idsAndKeys; } @@ -98,6 +107,12 @@ public class RawBatchConverter { ByteBuf payload = msg.getHeadersAndPayload(); MessageMetadata metadata = Commands.parseMessageMetadata(payload); ByteBuf batchBuffer = PooledByteBufAllocator.DEFAULT.buffer(payload.capacity()); + + CompressionType compressionType = metadata.getCompression(); + CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); + + int uncompressedSize = metadata.getUncompressedSize(); + ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize); try { int batchSize = metadata.getNumMessagesInBatch(); int messagesRetained = 0; @@ -105,7 +120,7 @@ public class RawBatchConverter { SingleMessageMetadata.Builder emptyMetadataBuilder = SingleMessageMetadata.newBuilder().setCompactedOut(true); for (int i = 0; i < batchSize; i++) { SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata.newBuilder(); - ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload, + ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload, singleMessageMetadataBuilder, 0, batchSize); MessageId id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(), @@ -131,10 +146,22 @@ public class RawBatchConverter { emptyMetadataBuilder.recycle(); if (messagesRetained > 0) { + int newUncompressedSize = batchBuffer.readableBytes(); + ByteBuf compressedPayload = codec.encode(batchBuffer); + + MessageMetadata.Builder metadataBuilder = metadata.toBuilder(); + metadataBuilder.setUncompressedSize(newUncompressedSize); + MessageMetadata newMetadata = metadataBuilder.build(); + ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, - metadata, batchBuffer); - return Optional.of(new RawMessageImpl(msg.getMessageIdData(), - metadataAndPayload)); + newMetadata, compressedPayload); + Optional<RawMessage> result = Optional.of(new RawMessageImpl(msg.getMessageIdData(), + metadataAndPayload)); + metadataBuilder.recycle(); + newMetadata.recycle(); + metadataAndPayload.release(); + compressedPayload.release(); + return result; } else { return Optional.empty(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 43f8bb2..9a774c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -39,6 +39,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; @@ -729,4 +730,83 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(new String(message3.getData()), "my-message-2"); } } + + @Test + public void testCompactCompressedNoBatch() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + // subscribe before sending anything, so that we get all messages + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe().close(); + + try (Producer producer = pulsarClient.newProducer().topic(topic) + .compressionType(CompressionType.LZ4).enableBatching(false).create()) { + producer.sendAsync(MessageBuilder.create() + .setKey("key1") + .setContent("my-message-1".getBytes()).build()); + producer.sendAsync(MessageBuilder.create() + .setKey("key2") + .setContent("my-message-2".getBytes()).build()); + producer.sendAsync(MessageBuilder.create() + .setKey("key2") + .setContent("my-message-3".getBytes()).build()).get(); + } + + // compact the topic + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + try (Consumer consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub1").readCompacted(true).subscribe()){ + Message message1 = consumer.receive(); + Assert.assertEquals(message1.getKey(), "key1"); + Assert.assertEquals(new String(message1.getData()), "my-message-1"); + + Message message2 = consumer.receive(); + Assert.assertEquals(message2.getKey(), "key2"); + Assert.assertEquals(new String(message2.getData()), "my-message-3"); + } + } + + @Test + public void testCompactCompressedBatching() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + // subscribe before sending anything, so that we get all messages + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe().close(); + + try (Producer producer = pulsarClient.newProducer().topic(topic) + .compressionType(CompressionType.LZ4) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + producer.sendAsync(MessageBuilder.create() + .setKey("key1") + .setContent("my-message-1".getBytes()).build()); + producer.sendAsync(MessageBuilder.create() + .setKey("key2") + .setContent("my-message-2".getBytes()).build()); + producer.sendAsync(MessageBuilder.create() + .setKey("key2") + .setContent("my-message-3".getBytes()).build()).get(); + } + + // compact the topic + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + try (Consumer consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub1").readCompacted(true).subscribe()){ + Message message1 = consumer.receive(); + Assert.assertEquals(message1.getKey(), "key1"); + Assert.assertEquals(new String(message1.getData()), "my-message-1"); + + Message message2 = consumer.receive(); + Assert.assertEquals(message2.getKey(), "key2"); + Assert.assertEquals(new String(message2.getData()), "my-message-3"); + } + } + } -- To stop receiving notification emails like this one, please contact si...@apache.org.