This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 346f831  Compaction allows keyless messages to pass through (#1514)
346f831 is described below

commit 346f8312484cc4389d96f2aeefc0be6393c4fcc2
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Fri Apr 6 23:35:56 2018 +0200

    Compaction allows keyless messages to pass through (#1514)
    
    If a message has no key it's impossible to know if a later message
    supercedes it, so in this case, it should be simply passed through
    compaction (i.e. it should appear when reading from a compacted
    topic).
---
 .../pulsar/client/impl/RawBatchConverter.java      |  7 ++-
 .../pulsar/compaction/TwoPhaseCompactor.java       | 14 ++++-
 .../apache/pulsar/compaction/CompactionTest.java   | 70 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index fa62982..4e628bc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -105,12 +105,15 @@ public class RawBatchConverter {
                 ByteBuf singleMessagePayload = 
Commands.deSerializeSingleMessageInBatch(payload,
                                                                                
         singleMessageMetadataBuilder,
                                                                                
         0, batchSize);
-                String key = singleMessageMetadataBuilder.getPartitionKey();
                 MessageId id = new 
BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
                                                       
msg.getMessageIdData().getEntryId(),
                                                       
msg.getMessageIdData().getPartition(),
                                                       i);
-                if (filter.test(key, id)) {
+                if (!singleMessageMetadataBuilder.hasPartitionKey()) {
+                    messagesRetained++;
+                    
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
+                                                                      
singleMessagePayload, batchBuffer);
+                } else if 
(filter.test(singleMessageMetadataBuilder.getPartitionKey(), id)) {
                     messagesRetained++;
                     
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
                                                                       
singleMessagePayload, batchBuffer);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index f12870b..fbad47e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -123,7 +123,9 @@ public class TwoPhaseCompactor extends Compactor {
                             }
                         } else {
                             String key = extractKey(m);
-                            latestForKey.put(key, id);
+                            if (key != null) {
+                                latestForKey.put(key, id);
+                            }
                         }
 
                         if (id.compareTo(lastMessageId) == 0) {
@@ -213,7 +215,9 @@ public class TwoPhaseCompactor extends Compactor {
                         }
                     } else {
                         String key = extractKey(m);
-                        if (latestForKey.get(key).equals(id)) {
+                        if (key == null) { // pass through messages without a 
key
+                            messageToAdd = Optional.of(m);
+                        } else if (latestForKey.get(key).equals(id)) {
                             messageToAdd = Optional.of(m);
                         } else {
                             m.close();
@@ -306,7 +310,11 @@ public class TwoPhaseCompactor extends Compactor {
     private static String extractKey(RawMessage m) {
         ByteBuf headersAndPayload = m.getHeadersAndPayload();
         MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
-        return msgMetadata.getPartitionKey();
+        if (msgMetadata.hasPartitionKey()) {
+            return msgMetadata.getPartitionKey();
+        } else {
+            return null;
+        }
     }
 
     private static class PhaseOneResult {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index af39e12..a0f0f97 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -442,4 +442,74 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             Assert.assertEquals(new String(message.getData()), "my-message-4");
         }
     }
+
+    @Test
+    public void testKeyLessMessagesPassThrough() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        // subscribe before sending anything, so that we get all messages
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+            .readCompacted(true).subscribe().close();
+
+        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create();
+             Producer producerBatch = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+                .enableBatching(true).batchingMaxMessages(3)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+            producerNormal.sendAsync(MessageBuilder.create()
+                                     
.setContent("my-message-1".getBytes()).build()).get();
+
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    
.setContent("my-message-2".getBytes()).build());
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key1")
+                                    
.setContent("my-message-3".getBytes()).build());
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key1")
+                                    
.setContent("my-message-4".getBytes()).build()).get();
+
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key2")
+                                    
.setContent("my-message-5".getBytes()).build());
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key2")
+                                    
.setContent("my-message-6".getBytes()).build());
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    
.setContent("my-message-7".getBytes()).build()).get();
+
+            producerNormal.sendAsync(MessageBuilder.create()
+                                     
.setContent("my-message-8".getBytes()).build()).get();
+        }
+
+        // compact the topic
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()){
+            Message message1 = consumer.receive();
+            Assert.assertFalse(message1.hasKey());
+            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+
+            Message message2 = consumer.receive();
+            Assert.assertFalse(message2.hasKey());
+            Assert.assertEquals(new String(message2.getData()), 
"my-message-2");
+
+            Message message3 = consumer.receive();
+            Assert.assertEquals(message3.getKey(), "key1");
+            Assert.assertEquals(new String(message3.getData()), 
"my-message-4");
+
+            Message message4 = consumer.receive();
+            Assert.assertEquals(message4.getKey(), "key2");
+            Assert.assertEquals(new String(message4.getData()), 
"my-message-6");
+
+            Message message5 = consumer.receive();
+            Assert.assertFalse(message5.hasKey());
+            Assert.assertEquals(new String(message5.getData()), 
"my-message-7");
+
+            Message message6 = consumer.receive();
+            Assert.assertFalse(message6.hasKey());
+            Assert.assertEquals(new String(message6.getData()), 
"my-message-8");
+        }
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to