sijie closed pull request #2346: Issue #2330: change getTopicName in MultiTopicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/2346
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index da04534d94..fe37b69a50 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -131,6 +131,8 @@ private final SubscriptionInitialPosition subscriptionInitialPosition; private final ConnectionHandler connectionHandler; + private final String topicNameWithoutPartition; + enum SubscriptionMode { // Make the subscription to be backed by a durable cursor that will retain messages and persist the current // position @@ -203,6 +205,8 @@ NonPersistentAcknowledgmentGroupingTracker.of(); } + topicNameWithoutPartition = topicName.getPartitionedTopicName(); + grabCnx(); } @@ -1458,6 +1462,10 @@ void grabCnx() { this.connectionHandler.grabCnx(); } + public String getTopicNameWithoutPartition() { + return topicNameWithoutPartition; + } + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 799bec8e4d..f1cb9cf262 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -231,7 +231,8 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) { checkArgument(message instanceof MessageImpl); lock.writeLock().lock(); try { - TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(consumer.getTopic(), message); + TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>( + consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message); unAckedMessageTracker.add(topicMessage.getMessageId()); if (log.isDebugEnabled()) { @@ -370,7 +371,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() { } if (ackType == AckType.Cumulative) { - Consumer individualConsumer = consumers.get(topicMessageId.getTopicName()); + Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); if (individualConsumer != null) { MessageId innerId = topicMessageId.getInnerMessageId(); return individualConsumer.acknowledgeCumulativeAsync(innerId); @@ -378,7 +379,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() { return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); } } else { - ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicName()); + ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName()); MessageId innerId = topicMessageId.getInnerMessageId(); return consumer.doAcknowledge(innerId, ackType, properties) @@ -511,7 +512,7 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) { } removeExpiredMessagesFromQueue(messageIds); messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId) - .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName, Collectors.toSet())) + .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, Collectors.toSet())) .forEach((topicName, messageIds1) -> consumers.get(topicName) .redeliverUnacknowledgedMessages(messageIds1.stream() diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java index 071b804ffb..dd1b37dd33 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java @@ -18,20 +18,39 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; + import java.util.Objects; import org.apache.pulsar.client.api.MessageId; public class TopicMessageIdImpl implements MessageId { + + /** This topicPartitionName is get from ConsumerImpl, it contains partition part. */ + private final String topicPartitionName; private final String topicName; private final MessageId messageId; - TopicMessageIdImpl(String topicName, MessageId messageId) { - this.topicName = topicName; + TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) { this.messageId = messageId; + this.topicPartitionName = topicPartitionName; + this.topicName = topicName; } + /** + * Get the topic name without partition part of this message. + * @return the name of the topic on which this message was published + */ public String getTopicName() { - return topicName; + return this.topicName; + } + + /** + * Get the topic name which contains partition part for this message. + * @return the topic name which contains Partition part + */ + public String getTopicPartitionName() { + return this.topicPartitionName; } public MessageId getInnerMessageId() { @@ -49,7 +68,7 @@ public boolean equals(Object obj) { return false; } TopicMessageIdImpl other = (TopicMessageIdImpl) obj; - return Objects.equals(topicName, other.topicName) + return Objects.equals(topicPartitionName, other.topicPartitionName) && Objects.equals(messageId, other.messageId); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index 4f5ac13f7d..eae02b08c0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -21,32 +21,44 @@ import java.util.Map; import java.util.Optional; - import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.EncryptionContext; public class TopicMessageImpl<T> implements Message<T> { + /** This topicPartitionName is get from ConsumerImpl, it contains partition part. */ + private final String topicPartitionName; private final String topicName; private final Message<T> msg; private final TopicMessageIdImpl messageId; - TopicMessageImpl(String topicName, + TopicMessageImpl(String topicPartitionName, + String topicName, Message<T> msg) { + this.topicPartitionName = topicPartitionName; this.topicName = topicName; + this.msg = msg; - this.messageId = new TopicMessageIdImpl(topicName, msg.getMessageId()); + this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, msg.getMessageId()); } /** - * Get the topic name of this message. + * Get the topic name without partition part of this message. * @return the name of the topic on which this message was published */ public String getTopicName() { return topicName; } + /** + * Get the topic name which contains partition part for this message. + * @return the topic name which contains Partition part + */ + public String getTopicPartitionName() { + return topicPartitionName; + } + @Override public MessageId getMessageId() { return messageId; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java index dfc9257728..f500fda040 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java @@ -32,12 +32,12 @@ public int removeTopicMessages(String topicName) { int currentSetRemovedMsgCount = currentSet.removeIf(m -> { checkState(m instanceof TopicMessageIdImpl, "message should be of type TopicMessageIdImpl"); - return ((TopicMessageIdImpl)m).getTopicName().contains(topicName); + return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName); }); int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> { checkState(m instanceof TopicMessageIdImpl, "message should be of type TopicMessageIdImpl"); - return ((TopicMessageIdImpl)m).getTopicName().contains(topicName); + return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName); }); return currentSetRemovedMsgCount + oldSetRemovedMsgCount; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 78af44efba..d032c23004 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.client.impl; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import org.testng.annotations.Test; @@ -122,12 +121,15 @@ public void testCompareDifferentType() { public void testMessageIdImplCompareToTopicMessageId() { MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567); TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new BatchMessageIdImpl(123L, 345L, 566, 789)); TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new BatchMessageIdImpl(123L, 345L, 567, 789)); TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new BatchMessageIdImpl(messageIdImpl)); assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than"); @@ -144,9 +146,11 @@ public void testBatchMessageIdImplCompareToTopicMessageId() { BatchMessageIdImpl messageIdImpl2 = new BatchMessageIdImpl(123L, 345L, 567, 0); BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1); TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new MessageIdImpl(123L, 345L, 566)); TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new MessageIdImpl(123L, 345L, 567)); assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java index 20787269d3..84dfad1cb5 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java @@ -245,7 +245,7 @@ private void testSubscriptionInitialPosition(int numTopics) throws Exception { Message<String> m = consumer.receive(); int topicIdx; if (numTopics > 1) { - String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicName(); + String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicPartitionName(); String[] topicParts = StringUtils.split(topic, '-'); topicIdx = Integer.parseInt(topicParts[topicParts.length - 1]); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services