sijie closed pull request #2346: Issue #2330: change getTopicName in 
MultiTopicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/2346
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index da04534d94..fe37b69a50 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -131,6 +131,8 @@
     private final SubscriptionInitialPosition subscriptionInitialPosition;
     private final ConnectionHandler connectionHandler;
 
+    private final String topicNameWithoutPartition;
+
     enum SubscriptionMode {
         // Make the subscription to be backed by a durable cursor that will 
retain messages and persist the current
         // position
@@ -203,6 +205,8 @@
                 NonPersistentAcknowledgmentGroupingTracker.of();
         }
 
+        topicNameWithoutPartition = topicName.getPartitionedTopicName();
+
         grabCnx();
     }
 
@@ -1458,6 +1462,10 @@ void grabCnx() {
         this.connectionHandler.grabCnx();
     }
 
+    public String getTopicNameWithoutPartition() {
+        return topicNameWithoutPartition;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerImpl.class);
 
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 799bec8e4d..f1cb9cf262 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -231,7 +231,8 @@ private void messageReceived(ConsumerImpl<T> consumer, 
Message<T> message) {
         checkArgument(message instanceof MessageImpl);
         lock.writeLock().lock();
         try {
-            TopicMessageImpl<T> topicMessage = new 
TopicMessageImpl<>(consumer.getTopic(), message);
+            TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
+                consumer.getTopic(), consumer.getTopicNameWithoutPartition(), 
message);
             unAckedMessageTracker.add(topicMessage.getMessageId());
 
             if (log.isDebugEnabled()) {
@@ -370,7 +371,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() {
         }
 
         if (ackType == AckType.Cumulative) {
-            Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicName());
+            Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicPartitionName());
             if (individualConsumer != null) {
                 MessageId innerId = topicMessageId.getInnerMessageId();
                 return individualConsumer.acknowledgeCumulativeAsync(innerId);
@@ -378,7 +379,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() {
                 return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
             }
         } else {
-            ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicName());
+            ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicPartitionName());
 
             MessageId innerId = topicMessageId.getInnerMessageId();
             return consumer.doAcknowledge(innerId, ackType, properties)
@@ -511,7 +512,7 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> 
messageIds) {
         }
         removeExpiredMessagesFromQueue(messageIds);
         messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId)
-            .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName, 
Collectors.toSet()))
+            
.collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, 
Collectors.toSet()))
             .forEach((topicName, messageIds1) ->
                 consumers.get(topicName)
                     .redeliverUnacknowledgedMessages(messageIds1.stream()
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index 071b804ffb..dd1b37dd33 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -18,20 +18,39 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
+
 import java.util.Objects;
 import org.apache.pulsar.client.api.MessageId;
 
 public class TopicMessageIdImpl implements MessageId {
+
+    /** This topicPartitionName is get from ConsumerImpl, it contains 
partition part. */
+    private final String topicPartitionName;
     private final String topicName;
     private final MessageId messageId;
 
-    TopicMessageIdImpl(String topicName, MessageId messageId) {
-        this.topicName = topicName;
+    TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId 
messageId) {
         this.messageId = messageId;
+        this.topicPartitionName = topicPartitionName;
+        this.topicName = topicName;
     }
 
+    /**
+     * Get the topic name without partition part of this message.
+     * @return the name of the topic on which this message was published
+     */
     public String getTopicName() {
-        return topicName;
+        return this.topicName;
+    }
+
+    /**
+     * Get the topic name which contains partition part for this message.
+     * @return the topic name which contains Partition part
+     */
+    public String getTopicPartitionName() {
+        return this.topicPartitionName;
     }
 
     public MessageId getInnerMessageId() {
@@ -49,7 +68,7 @@ public boolean equals(Object obj) {
             return false;
         }
         TopicMessageIdImpl other = (TopicMessageIdImpl) obj;
-        return Objects.equals(topicName, other.topicName)
+        return Objects.equals(topicPartitionName, other.topicPartitionName)
             && Objects.equals(messageId, other.messageId);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 4f5ac13f7d..eae02b08c0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -21,32 +21,44 @@
 
 import java.util.Map;
 import java.util.Optional;
-
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.EncryptionContext;
 
 public class TopicMessageImpl<T> implements Message<T> {
 
+    /** This topicPartitionName is get from ConsumerImpl, it contains 
partition part. */
+    private final String topicPartitionName;
     private final String topicName;
     private final Message<T> msg;
     private final TopicMessageIdImpl messageId;
 
-    TopicMessageImpl(String topicName,
+    TopicMessageImpl(String topicPartitionName,
+                     String topicName,
                      Message<T> msg) {
+        this.topicPartitionName = topicPartitionName;
         this.topicName = topicName;
+
         this.msg = msg;
-        this.messageId = new TopicMessageIdImpl(topicName, msg.getMessageId());
+        this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, 
msg.getMessageId());
     }
 
     /**
-     * Get the topic name of this message.
+     * Get the topic name without partition part of this message.
      * @return the name of the topic on which this message was published
      */
     public String getTopicName() {
         return topicName;
     }
 
+    /**
+     * Get the topic name which contains partition part for this message.
+     * @return the topic name which contains Partition part
+     */
+    public String getTopicPartitionName() {
+        return topicPartitionName;
+    }
+
     @Override
     public MessageId getMessageId() {
         return messageId;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
index dfc9257728..f500fda040 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -32,12 +32,12 @@ public int removeTopicMessages(String topicName) {
             int currentSetRemovedMsgCount = currentSet.removeIf(m -> {
                 checkState(m instanceof TopicMessageIdImpl,
                     "message should be of type TopicMessageIdImpl");
-                return 
((TopicMessageIdImpl)m).getTopicName().contains(topicName);
+                return 
((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
             });
             int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> {
                 checkState(m instanceof TopicMessageIdImpl,
                     "message should be of type TopicMessageIdImpl");
-                return 
((TopicMessageIdImpl)m).getTopicName().contains(topicName);
+                return 
((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
             });
 
             return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
index 78af44efba..d032c23004 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 import org.testng.annotations.Test;
@@ -122,12 +121,15 @@ public void testCompareDifferentType() {
     public void testMessageIdImplCompareToTopicMessageId() {
         MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567);
         TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
+            "test-topic-partition-0",
             "test-topic",
             new BatchMessageIdImpl(123L, 345L, 566, 789));
         TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
+            "test-topic-partition-0",
             "test-topic",
             new BatchMessageIdImpl(123L, 345L, 567, 789));
         TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl(
+            "test-topic-partition-0",
             "test-topic",
             new BatchMessageIdImpl(messageIdImpl));
         assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to 
be greater than");
@@ -144,9 +146,11 @@ public void 
testBatchMessageIdImplCompareToTopicMessageId() {
         BatchMessageIdImpl messageIdImpl2 = new BatchMessageIdImpl(123L, 345L, 
567, 0);
         BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 
567, -1);
         TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
+            "test-topic-partition-0",
             "test-topic",
             new MessageIdImpl(123L, 345L, 566));
         TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
+            "test-topic-partition-0",
             "test-topic",
             new MessageIdImpl(123L, 345L, 567));
         assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to 
be greater than");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
index 20787269d3..84dfad1cb5 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
@@ -245,7 +245,7 @@ private void testSubscriptionInitialPosition(int numTopics) 
throws Exception {
                     Message<String> m = consumer.receive();
                     int topicIdx;
                     if (numTopics > 1) {
-                        String topic = ((TopicMessageIdImpl) 
m.getMessageId()).getTopicName();
+                        String topic = ((TopicMessageIdImpl) 
m.getMessageId()).getTopicPartitionName();
 
                         String[] topicParts = StringUtils.split(topic, '-');
                         topicIdx = 
Integer.parseInt(topicParts[topicParts.length - 1]);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to