This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f97a8f7  [compaction] make topic compaction works with partitioned 
topic (#2367)
f97a8f7 is described below

commit f97a8f7d136ddc609d01ce54e20f45e9949b04df
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Tue Aug 14 11:02:24 2018 -0700

    [compaction] make topic compaction works with partitioned topic (#2367)
    
    * [compaction] make topic compaction works with partitioned topic
    
     ### Motivation
    
    Topic compaction doesn't work with partitioned topic.
    
     ### Changes
    
    - make `RawReaderImpl` and `ReaderImpl` return message with partition idx
    - make broker service `Consumer` deliver MessageIdData with partition idx
    - add an integration test to ensure compaction work with partitioned topic
---
 .../java/org/apache/pulsar/broker/service/Consumer.java  |  9 +++++++--
 .../org/apache/pulsar/client/impl/RawReaderImpl.java     | 16 ++++++++++++----
 .../org/apache/pulsar/compaction/TwoPhaseCompactor.java  |  8 ++++----
 .../PersistentDispatcherFailoverConsumerTest.java        |  2 +-
 .../java/org/apache/pulsar/client/impl/ReaderImpl.java   |  4 +++-
 5 files changed, 27 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c3befa5..4c54018 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -73,6 +73,7 @@ public class Consumer {
     private final String appId;
     private AuthenticationDataSource authenticationData;
     private final String topicName;
+    private final int partitionIdx;
     private final InitialPosition subscriptionInitialPosition;
 
     private final long consumerId;
@@ -119,6 +120,7 @@ public class Consumer {
         this.subscription = subscription;
         this.subType = subType;
         this.topicName = topicName;
+        this.partitionIdx = TopicName.getPartitionIndex(topicName);
         this.consumerId = consumerId;
         this.priorityLevel = priorityLevel;
         this.readCompacted = readCompacted;
@@ -239,8 +241,11 @@ public class Consumer {
                 Entry entry = entries.get(i);
                 PositionImpl pos = (PositionImpl) entry.getPosition();
                 MessageIdData.Builder messageIdBuilder = 
MessageIdData.newBuilder();
-                MessageIdData messageId = 
messageIdBuilder.setLedgerId(pos.getLedgerId()).setEntryId(pos.getEntryId())
-                        .build();
+                MessageIdData messageId = messageIdBuilder
+                    .setLedgerId(pos.getLedgerId())
+                    .setEntryId(pos.getEntryId())
+                    .setPartition(partitionIdx)
+                    .build();
 
                 ByteBuf metadataAndPayload = entry.getDataBuffer();
                 // increment ref-count of data and release at the end of 
process: so, we can get chance to call entry.release
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index e768c3e..ae1a4db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,8 +103,15 @@ public class RawReaderImpl implements RawReader {
 
         RawConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<byte[]> conf,
                 CompletableFuture<Consumer<byte[]>> consumerFuture) {
-            super(client, conf.getSingleTopic(), conf, 
client.externalExecutorProvider().getExecutor(), -1,
-                    consumerFuture, SubscriptionMode.Durable, 
MessageId.earliest, Schema.BYTES);
+            super(client,
+                conf.getSingleTopic(),
+                conf,
+                client.externalExecutorProvider().getExecutor(),
+                TopicName.getPartitionIndex(conf.getSingleTopic()),
+                consumerFuture,
+                SubscriptionMode.Durable,
+                MessageId.earliest,
+                Schema.BYTES);
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
         }
@@ -172,8 +180,8 @@ public class RawReaderImpl implements RawReader {
         @Override
         void messageReceived(MessageIdData messageId, ByteBuf 
headersAndPayload, ClientCnx cnx) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Received raw message: {}/{}", topic, 
subscription,
-                          messageId.getLedgerId(), messageId.getEntryId());
+                log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, 
subscription,
+                          messageId.getEntryId(), messageId.getLedgerId(), 
messageId.getPartition());
             }
             incomingRawMessages.add(
                     new RawMessageAndCnx(new RawMessageImpl(messageId, 
headersAndPayload), cnx));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index b4ee68a..cc3f710 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -146,11 +146,11 @@ public class TwoPhaseCompactor extends Compactor {
 
     private void scheduleTimeout(CompletableFuture<RawMessage> future) {
         Future<?> timeout = scheduler.schedule(() -> {
-                future.completeExceptionally(new TimeoutException("Timeout"));
-            }, 10, TimeUnit.SECONDS);
+            future.completeExceptionally(new TimeoutException("Timeout"));
+        }, 10, TimeUnit.SECONDS);
         future.whenComplete((res, exception) -> {
-                timeout.cancel(true);
-            });
+            timeout.cancel(true);
+        });
     }
 
     private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index ccd32e1..6f16733 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -579,7 +579,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
     private Consumer createConsumer(int priority, int permit, boolean blocked, 
int id) throws Exception {
         Consumer consumer =
-                new Consumer(null, SubType.Shared, null, id, priority, ""+id, 
5000,
+                new Consumer(null, SubType.Shared, "test-topic", id, priority, 
""+id, 5000,
                         serverCnx, "appId", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest);
         try {
             consumer.flowPermits(permit);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 00f8af0..aafd125 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
 
 public class ReaderImpl<T> implements Reader<T> {
 
@@ -82,8 +83,9 @@ public class ReaderImpl<T> implements Reader<T> {
             
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
         }
 
+        final int partitionIdx = 
TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, 
readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                -1, consumerFuture, SubscriptionMode.NonDurable, 
readerConfiguration.getStartMessageId(), schema);
+                partitionIdx, consumerFuture, SubscriptionMode.NonDurable, 
readerConfiguration.getStartMessageId(), schema);
     }
 
     @Override

Reply via email to