This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8688b67  Issue #2330: change getTopicName in MultiTopicsConsumer 
(#2346)
8688b67 is described below

commit 8688b6733177ee4a48d805df18a1d82865cf7320
Author: Jia Zhai <jiaz...@users.noreply.github.com>
AuthorDate: Tue Aug 28 02:38:41 2018 +0800

    Issue #2330: change getTopicName in MultiTopicsConsumer (#2346)
    
    * change getTopicName in MultiTopicsConsumer
    
    * change following sijie's comments
    
    * keep both topicName and topicPartitonName in consumer to avoid new string
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  8 +++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  9 ++++----
 .../pulsar/client/impl/TopicMessageIdImpl.java     | 27 ++++++++++++++++++----
 .../pulsar/client/impl/TopicMessageImpl.java       | 20 ++++++++++++----
 .../client/impl/UnAckedTopicMessageTracker.java    |  4 ++--
 .../pulsar/client/impl/MessageIdCompareToTest.java |  6 ++++-
 .../tests/integration/semantics/SemanticsTest.java |  2 +-
 7 files changed, 60 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index da04534..fe37b69 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -131,6 +131,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private final SubscriptionInitialPosition subscriptionInitialPosition;
     private final ConnectionHandler connectionHandler;
 
+    private final String topicNameWithoutPartition;
+
     enum SubscriptionMode {
         // Make the subscription to be backed by a durable cursor that will 
retain messages and persist the current
         // position
@@ -203,6 +205,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 NonPersistentAcknowledgmentGroupingTracker.of();
         }
 
+        topicNameWithoutPartition = topicName.getPartitionedTopicName();
+
         grabCnx();
     }
 
@@ -1458,6 +1462,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         this.connectionHandler.grabCnx();
     }
 
+    public String getTopicNameWithoutPartition() {
+        return topicNameWithoutPartition;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerImpl.class);
 
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 799bec8..f1cb9cf 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -231,7 +231,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         checkArgument(message instanceof MessageImpl);
         lock.writeLock().lock();
         try {
-            TopicMessageImpl<T> topicMessage = new 
TopicMessageImpl<>(consumer.getTopic(), message);
+            TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
+                consumer.getTopic(), consumer.getTopicNameWithoutPartition(), 
message);
             unAckedMessageTracker.add(topicMessage.getMessageId());
 
             if (log.isDebugEnabled()) {
@@ -370,7 +371,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
 
         if (ackType == AckType.Cumulative) {
-            Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicName());
+            Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicPartitionName());
             if (individualConsumer != null) {
                 MessageId innerId = topicMessageId.getInnerMessageId();
                 return individualConsumer.acknowledgeCumulativeAsync(innerId);
@@ -378,7 +379,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
             }
         } else {
-            ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicName());
+            ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicPartitionName());
 
             MessageId innerId = topicMessageId.getInnerMessageId();
             return consumer.doAcknowledge(innerId, ackType, properties)
@@ -511,7 +512,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
         removeExpiredMessagesFromQueue(messageIds);
         messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId)
-            .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName, 
Collectors.toSet()))
+            
.collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, 
Collectors.toSet()))
             .forEach((topicName, messageIds1) ->
                 consumers.get(topicName)
                     .redeliverUnacknowledgedMessages(messageIds1.stream()
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index 071b804..dd1b37d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -18,20 +18,39 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
+
 import java.util.Objects;
 import org.apache.pulsar.client.api.MessageId;
 
 public class TopicMessageIdImpl implements MessageId {
+
+    /** This topicPartitionName is get from ConsumerImpl, it contains 
partition part. */
+    private final String topicPartitionName;
     private final String topicName;
     private final MessageId messageId;
 
-    TopicMessageIdImpl(String topicName, MessageId messageId) {
-        this.topicName = topicName;
+    TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId 
messageId) {
         this.messageId = messageId;
+        this.topicPartitionName = topicPartitionName;
+        this.topicName = topicName;
     }
 
+    /**
+     * Get the topic name without partition part of this message.
+     * @return the name of the topic on which this message was published
+     */
     public String getTopicName() {
-        return topicName;
+        return this.topicName;
+    }
+
+    /**
+     * Get the topic name which contains partition part for this message.
+     * @return the topic name which contains Partition part
+     */
+    public String getTopicPartitionName() {
+        return this.topicPartitionName;
     }
 
     public MessageId getInnerMessageId() {
@@ -49,7 +68,7 @@ public class TopicMessageIdImpl implements MessageId {
             return false;
         }
         TopicMessageIdImpl other = (TopicMessageIdImpl) obj;
-        return Objects.equals(topicName, other.topicName)
+        return Objects.equals(topicPartitionName, other.topicPartitionName)
             && Objects.equals(messageId, other.messageId);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 4f5ac13..eae02b0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -21,32 +21,44 @@ package org.apache.pulsar.client.impl;
 
 import java.util.Map;
 import java.util.Optional;
-
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.EncryptionContext;
 
 public class TopicMessageImpl<T> implements Message<T> {
 
+    /** This topicPartitionName is get from ConsumerImpl, it contains 
partition part. */
+    private final String topicPartitionName;
     private final String topicName;
     private final Message<T> msg;
     private final TopicMessageIdImpl messageId;
 
-    TopicMessageImpl(String topicName,
+    TopicMessageImpl(String topicPartitionName,
+                     String topicName,
                      Message<T> msg) {
+        this.topicPartitionName = topicPartitionName;
         this.topicName = topicName;
+
         this.msg = msg;
-        this.messageId = new TopicMessageIdImpl(topicName, msg.getMessageId());
+        this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, 
msg.getMessageId());
     }
 
     /**
-     * Get the topic name of this message.
+     * Get the topic name without partition part of this message.
      * @return the name of the topic on which this message was published
      */
     public String getTopicName() {
         return topicName;
     }
 
+    /**
+     * Get the topic name which contains partition part for this message.
+     * @return the topic name which contains Partition part
+     */
+    public String getTopicPartitionName() {
+        return topicPartitionName;
+    }
+
     @Override
     public MessageId getMessageId() {
         return messageId;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
index dfc9257..f500fda 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -32,12 +32,12 @@ public class UnAckedTopicMessageTracker extends 
UnAckedMessageTracker {
             int currentSetRemovedMsgCount = currentSet.removeIf(m -> {
                 checkState(m instanceof TopicMessageIdImpl,
                     "message should be of type TopicMessageIdImpl");
-                return 
((TopicMessageIdImpl)m).getTopicName().contains(topicName);
+                return 
((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
             });
             int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> {
                 checkState(m instanceof TopicMessageIdImpl,
                     "message should be of type TopicMessageIdImpl");
-                return 
((TopicMessageIdImpl)m).getTopicName().contains(topicName);
+                return 
((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
             });
 
             return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
index 78af44e..d032c23 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 import org.testng.annotations.Test;
@@ -122,12 +121,15 @@ public class MessageIdCompareToTest  {
     public void testMessageIdImplCompareToTopicMessageId() {
         MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567);
         TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
+            "test-topic-partition-0",
             "test-topic",
             new BatchMessageIdImpl(123L, 345L, 566, 789));
         TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
+            "test-topic-partition-0",
             "test-topic",
             new BatchMessageIdImpl(123L, 345L, 567, 789));
         TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl(
+            "test-topic-partition-0",
             "test-topic",
             new BatchMessageIdImpl(messageIdImpl));
         assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to 
be greater than");
@@ -144,9 +146,11 @@ public class MessageIdCompareToTest  {
         BatchMessageIdImpl messageIdImpl2 = new BatchMessageIdImpl(123L, 345L, 
567, 0);
         BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 
567, -1);
         TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
+            "test-topic-partition-0",
             "test-topic",
             new MessageIdImpl(123L, 345L, 566));
         TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
+            "test-topic-partition-0",
             "test-topic",
             new MessageIdImpl(123L, 345L, 567));
         assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to 
be greater than");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
index 2078726..84dfad1 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
@@ -245,7 +245,7 @@ public class SemanticsTest extends PulsarClusterTestBase {
                     Message<String> m = consumer.receive();
                     int topicIdx;
                     if (numTopics > 1) {
-                        String topic = ((TopicMessageIdImpl) 
m.getMessageId()).getTopicName();
+                        String topic = ((TopicMessageIdImpl) 
m.getMessageId()).getTopicPartitionName();
 
                         String[] topicParts = StringUtils.split(topic, '-');
                         topicIdx = 
Integer.parseInt(topicParts[topicParts.length - 1]);

Reply via email to