This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6541720  Utility to rebatch a message (#1391)
6541720 is described below

commit 65417201d0b65058a4a4d81eb5dfe12e6b769b8b
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu Mar 22 17:21:53 2018 +0100

    Utility to rebatch a message (#1391)
    
    * Utility to rebatch a message
    
    When compacting a batched message, we need to take the message, break
    it into its constituent submessages, select the submessages we wish to
    keep, and rebuild the batched message. In other words rebatching. To
    maintain the same batched message ids, the submessages which are not
    selected are replaced with dummy messages with the compactedOut flag
    set to true.
    
    This patch contains a utility method to do this rebatching, along with
    another utility to extract the ids and keys from a batch. These will
    be used to add batching support to compaction in a later patch.
    
    * Avoid double release of m1
---
 .../pulsar/client/impl/RawBatchConverter.java      | 110 ++++++++++++++++-----
 .../apache/pulsar/client/impl/RawReaderTest.java   |  96 +++++++++---------
 2 files changed, 137 insertions(+), 69 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index b80f216..fa62982 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -18,14 +18,22 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
-import org.apache.pulsar.client.api.RawMessage;
+import java.util.Optional;
+import java.util.function.BiPredicate;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
@@ -36,45 +44,97 @@ import org.slf4j.LoggerFactory;
 public class RawBatchConverter {
     private static final Logger log = 
LoggerFactory.getLogger(RawBatchConverter.class);
 
-    private static MessageMetadata mergeMetadata(MessageMetadata batchMetadata,
-                                                 SingleMessageMetadata.Builder 
singleMessageMetadata) {
-        // is uncompressed size correct?
-        return batchMetadata.toBuilder()
-            .setNumMessagesInBatch(1)
-            .setUncompressedSize(singleMessageMetadata.getPayloadSize())
-            .addAllProperties(singleMessageMetadata.getPropertiesList())
-            .setPartitionKey(singleMessageMetadata.getPartitionKey()).build();
-    }
-
     public static boolean isBatch(RawMessage msg) {
         ByteBuf payload = msg.getHeadersAndPayload();
         MessageMetadata metadata = Commands.parseMessageMetadata(payload);
         int batchSize = metadata.getNumMessagesInBatch();
-        return batchSize > 0;
+        return batchSize > 1;
     }
 
-    public static Collection<RawMessage> explodeBatch(RawMessage msg) throws 
IOException {
-        assert(msg.getMessageIdData().getBatchIndex() == -1);
+    public static List<ImmutablePair<MessageId,String>> 
extractIdsAndKeys(RawMessage msg)
+            throws IOException {
+        checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
 
         ByteBuf payload = msg.getHeadersAndPayload();
         MessageMetadata metadata = Commands.parseMessageMetadata(payload);
         int batchSize = metadata.getNumMessagesInBatch();
-        List<RawMessage> exploded = new ArrayList<>();
+        metadata.recycle();
+
+        List<ImmutablePair<MessageId,String>> idsAndKeys = new ArrayList<>();
+
         for (int i = 0; i < batchSize; i++) {
             SingleMessageMetadata.Builder singleMessageMetadataBuilder = 
SingleMessageMetadata.newBuilder();
             ByteBuf singleMessagePayload = 
Commands.deSerializeSingleMessageInBatch(payload,
                                                                                
     singleMessageMetadataBuilder,
                                                                                
     0, batchSize);
-
-            // serializeMetadataAndPayload takes ownership of the the payload
-            ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(
-                    Commands.ChecksumType.Crc32c, mergeMetadata(metadata, 
singleMessageMetadataBuilder),
-                    singleMessagePayload);
-            exploded.add(new 
RawMessageImpl(msg.getMessageIdData().toBuilder().setBatchIndex(i).build(),
-                                            metadataAndPayload));
-            metadataAndPayload.release();
+            MessageId id = new 
BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
+                                                  
msg.getMessageIdData().getEntryId(),
+                                                  
msg.getMessageIdData().getPartition(),
+                                                  i);
+            if (!singleMessageMetadataBuilder.getCompactedOut()) {
+                idsAndKeys.add(ImmutablePair.of(id, 
singleMessageMetadataBuilder.getPartitionKey()));
+            }
+            singleMessageMetadataBuilder.recycle();
             singleMessagePayload.release();
         }
-        return exploded;
+        return idsAndKeys;
+    }
+
+    /**
+     * Take a batched message and a filter, and returns a message with the 
only the submessages
+     * which match the filter. Returns an empty optional if no messages match.
+     *
+     * This takes ownership of the passes in message, and if the returned 
optional is not empty,
+     * the ownership of that message is returned also.
+     */
+    public static Optional<RawMessage> rebatchMessage(RawMessage msg,
+                                                      BiPredicate<String, 
MessageId> filter)
+            throws IOException {
+        checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
+
+        ByteBuf payload = msg.getHeadersAndPayload();
+        MessageMetadata metadata = Commands.parseMessageMetadata(payload);
+        ByteBuf batchBuffer = 
PooledByteBufAllocator.DEFAULT.buffer(payload.capacity());
+        try {
+            int batchSize = metadata.getNumMessagesInBatch();
+            int messagesRetained = 0;
+
+            SingleMessageMetadata.Builder emptyMetadataBuilder = 
SingleMessageMetadata.newBuilder().setCompactedOut(true);
+            for (int i = 0; i < batchSize; i++) {
+                SingleMessageMetadata.Builder singleMessageMetadataBuilder = 
SingleMessageMetadata.newBuilder();
+                ByteBuf singleMessagePayload = 
Commands.deSerializeSingleMessageInBatch(payload,
+                                                                               
         singleMessageMetadataBuilder,
+                                                                               
         0, batchSize);
+                String key = singleMessageMetadataBuilder.getPartitionKey();
+                MessageId id = new 
BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
+                                                      
msg.getMessageIdData().getEntryId(),
+                                                      
msg.getMessageIdData().getPartition(),
+                                                      i);
+                if (filter.test(key, id)) {
+                    messagesRetained++;
+                    
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
+                                                                      
singleMessagePayload, batchBuffer);
+                } else {
+                    
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadataBuilder,
+                                                                      
Unpooled.EMPTY_BUFFER, batchBuffer);
+                }
+                singleMessageMetadataBuilder.recycle();
+                singleMessagePayload.release();
+            }
+            emptyMetadataBuilder.recycle();
+
+            if (messagesRetained > 0) {
+                ByteBuf metadataAndPayload = 
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
+                                                                               
   metadata, batchBuffer);
+                return Optional.of(new RawMessageImpl(msg.getMessageIdData(),
+                                                      metadataAndPayload));
+            } else {
+                return Optional.empty();
+            }
+        } finally {
+            batchBuffer.release();
+            metadata.recycle();
+            msg.close();
+        }
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index b63d9c4..e061e3e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -28,9 +28,9 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Consumer;
 
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageBuilder;
@@ -74,12 +74,10 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
         super.internalCleanup();
     }
 
-    private Set<String> publishMessagesBase(String topic, int count, boolean 
batching) throws Exception {
+    private Set<String> publishMessages(String topic, int count) throws 
Exception {
         Set<String> keys = new HashSet<>();
 
-        try (Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(count)
-                
.enableBatching(batching).batchingMaxMessages(BATCH_MAX_MESSAGES)
-                .batchingMaxPublishDelay(Long.MAX_VALUE, 
TimeUnit.DAYS).create()) {
+        try (Producer<byte[]> producer = 
pulsarClient.newProducer().maxPendingMessages(count).topic(topic).create()) {
             Future<?> lastFuture = null;
             for (int i = 0; i < count; i++) {
                 String key = "key"+i;
@@ -94,14 +92,6 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
         return keys;
     }
 
-    private Set<String> publishMessages(String topic, int count) throws 
Exception {
-        return publishMessagesBase(topic, count, false);
-    }
-
-    private Set<String> publishMessagesInBatches(String topic, int count) 
throws Exception {
-        return publishMessagesBase(topic, count, true);
-    }
-
     public static String extractKey(RawMessage m) throws Exception {
         ByteBuf headersAndPayload = m.getHeadersAndPayload();
         MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
@@ -239,45 +229,63 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testBatching() throws Exception {
-        int numMessages = BATCH_MAX_MESSAGES * 5;
+    public void testBatchingExtractKeysAndIds() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-raw-topic";
 
-        Set<String> keys = publishMessagesInBatches(topic, numMessages);
+        try (Producer producer = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+                
.enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, 
TimeUnit.HOURS).create()) {
+            producer.sendAsync(MessageBuilder.create()
+                               
.setKey("key1").setContent("my-content-1".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               
.setKey("key2").setContent("my-content-2".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               
.setKey("key3").setContent("my-content-3".getBytes()).build()).get();
+        }
 
         RawReader reader = RawReader.create(pulsarClient, topic, 
subscription).get();
+        try (RawMessage m = reader.readNextAsync().get()) {
+            List<ImmutablePair<MessageId,String>> idsAndKeys = 
RawBatchConverter.extractIdsAndKeys(m);
 
-        Consumer<RawMessage> consumer = new Consumer<RawMessage>() {
-                BatchMessageIdImpl lastId = new BatchMessageIdImpl(-1, -1, -1, 
-1);
+            Assert.assertEquals(idsAndKeys.size(), 3);
 
-                @Override
-                public void accept(RawMessage m) {
-                    try {
-                        Assert.assertTrue(keys.remove(extractKey(m)));
-                        Assert.assertTrue(m.getMessageId() instanceof 
BatchMessageIdImpl);
-                        BatchMessageIdImpl id = 
(BatchMessageIdImpl)m.getMessageId();
+            // assert message ids are in correct order
+            
Assert.assertTrue(idsAndKeys.get(0).getLeft().compareTo(idsAndKeys.get(1).getLeft())
 < 0);
+            
Assert.assertTrue(idsAndKeys.get(1).getLeft().compareTo(idsAndKeys.get(2).getLeft())
 < 0);
 
-                        // id should be greater than lastId
-                        Assert.assertEquals(id.compareTo(lastId), 1);
-                    } catch (Exception e) {
-                        Assert.fail("Error checking message", e);
-                    }
-                }
-            };
-        MessageId lastMessageId = reader.getLastMessageIdAsync().get();
-        while (true) {
-            try (RawMessage m = reader.readNextAsync().get()) {
-                if (RawBatchConverter.isBatch(m)) {
-                    RawBatchConverter.explodeBatch(m).forEach(consumer);
-                } else {
-                    consumer.accept(m);
-                }
-                if (lastMessageId.compareTo(m.getMessageId()) == 0) {
-                    break;
-                }
-            }
+            // assert keys are as expected
+            Assert.assertEquals(idsAndKeys.get(0).getRight(), "key1");
+            Assert.assertEquals(idsAndKeys.get(1).getRight(), "key2");
+            Assert.assertEquals(idsAndKeys.get(2).getRight(), "key3");
+        } finally {
+            reader.closeAsync().get();
+        }
+    }
+
+    @Test
+    public void testBatchingRebatch() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-raw-topic";
+
+        try (Producer producer = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+                
.enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, 
TimeUnit.HOURS).create()) {
+            producer.sendAsync(MessageBuilder.create()
+                               
.setKey("key1").setContent("my-content-1".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               
.setKey("key2").setContent("my-content-2".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               
.setKey("key3").setContent("my-content-3".getBytes()).build()).get();
+        }
+
+        RawReader reader = RawReader.create(pulsarClient, topic, 
subscription).get();
+        try {
+            RawMessage m1 = reader.readNextAsync().get();
+            RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) -> 
key.equals("key2")).get();
+            List<ImmutablePair<MessageId,String>> idsAndKeys = 
RawBatchConverter.extractIdsAndKeys(m2);
+            Assert.assertEquals(idsAndKeys.size(), 1);
+            Assert.assertEquals(idsAndKeys.get(0).getRight(), "key2");
+            m2.close();
+        } finally {
+            reader.closeAsync().get();
         }
-        Assert.assertTrue(keys.isEmpty());
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to