This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 1dd9c43 Delete PartitionedConsumerImpl, use TopicsConsumerImpl instead (#1365) 1dd9c43 is described below commit 1dd9c43e8e266d3957177354f592015c6f89c6d6 Author: Jia Zhai <zhaiji...@gmail.com> AuthorDate: Tue Apr 3 22:03:55 2018 -0700 Delete PartitionedConsumerImpl, use TopicsConsumerImpl instead (#1365) * delete partitionedConsumer, use topicsConsumer instead * change following comments * rebase master, rename TopicsConsumerImpl to MultiTopicsConsumerImpl * avoid dup calling getPartitionedTopicMetadata * rebase master, fix test error --- .../broker/service/PersistentFailoverE2ETest.java | 7 +- .../apache/pulsar/client/impl/MessageIdTest.java | 7 +- .../client/impl/PatternTopicsConsumerImplTest.java | 76 +-- .../PerMessageUnAcknowledgedRedeliveryTest.java | 6 +- .../pulsar/client/impl/TopicsConsumerImplTest.java | 46 +- .../apache/pulsar/client/impl/ConsumerImpl.java | 2 +- ...sumerImpl.java => MultiTopicsConsumerImpl.java} | 283 ++++++----- .../client/impl/PartitionedConsumerImpl.java | 553 --------------------- ...pl.java => PatternMultiTopicsConsumerImpl.java} | 24 +- .../pulsar/client/impl/PulsarClientImpl.java | 8 +- .../pulsar/client/impl/TopicMessageImpl.java | 6 +- 11 files changed, 263 insertions(+), 755 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index 6176821..08cd7f6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; @@ -323,13 +324,11 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { final String subName = "sub1"; final int numMsgs = 100; Set<String> uniqueMessages = new HashSet<>(); - admin.persistentTopics().createPartitionedTopic(topicName, numPartitions); ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) .subscriptionType(SubscriptionType.Failover); - // 1. two consumers on the same subscription ActiveInactiveListenerEvent listener1 = new ActiveInactiveListenerEvent(); ActiveInactiveListenerEvent listener2 = new ActiveInactiveListenerEvent(); @@ -374,7 +373,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { } totalMessages++; consumer1.acknowledge(msg); - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId()); receivedPtns.add(msgId.getPartitionIndex()); } @@ -391,7 +390,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { } totalMessages++; consumer2.acknowledge(msg); - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId()); receivedPtns.add(msgId.getPartitionIndex()); } assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java index 4a9912d..6d6fc92 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java @@ -208,7 +208,8 @@ public class MessageIdTest extends BrokerTestBase { Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all messages published successfully"); for (int i = 0; i < numberOfMessages; i++) { - MessageId messageId = consumer.receive().getMessageId(); + MessageId topicMessageId = consumer.receive().getMessageId(); + MessageId messageId = ((TopicMessageIdImpl)topicMessageId).getInnerMessageId(); log.info("Message ID Received = " + messageId); Assert.assertTrue(messageIds.remove(messageId), "Failed to receive Message"); } @@ -247,7 +248,9 @@ public class MessageIdTest extends BrokerTestBase { Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all messages published successfully"); for (int i = 0; i < numberOfMessages; i++) { - Assert.assertTrue(messageIds.remove(consumer.receive().getMessageId()), "Failed to receive Message"); + MessageId topicMessageId = consumer.receive().getMessageId(); + MessageId messageId = ((TopicMessageIdImpl)topicMessageId).getInnerMessageId(); + Assert.assertTrue(messageIds.remove(messageId), "Failed to receive Message"); } log.info("Message IDs = " + messageIds); Assert.assertEquals(messageIds.size(), 0, "Not all messages received successfully"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 4f9cc70..b3e74f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -161,13 +161,13 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { .subscribe(); // 4. verify consumer get methods, to get right number of partitions and topics. - assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); - List<String> topics = ((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics(); - List<ConsumerImpl<byte[]>> consumers = ((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers(); + assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern()); + List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics(); + List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers(); assertEquals(topics.size(), 6); assertEquals(consumers.size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); topics.forEach(topic -> log.debug("topic: {}", topic)); consumers.forEach(c -> log.debug("consumer: {}", c.getTopic())); @@ -175,7 +175,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { IntStream.range(0, topics.size()).forEach(index -> assertTrue(topics.get(index).equals(consumers.get(index).getTopic()))); - ((PatternTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); + ((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); // 5. produce data for (int i = 0; i < totalMessages / 3; i++) { @@ -235,8 +235,8 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { List<String> oldNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); List<String> newNames = Lists.newArrayList(topicName3, topicName4, topicName5, topicName6); - List<String> addedNames = PatternTopicsConsumerImpl.topicsListsMinus(newNames, oldNames); - List<String> removedNames = PatternTopicsConsumerImpl.topicsListsMinus(oldNames, newNames); + List<String> addedNames = PatternMultiTopicsConsumerImpl.topicsListsMinus(newNames, oldNames); + List<String> removedNames = PatternMultiTopicsConsumerImpl.topicsListsMinus(oldNames, newNames); assertTrue(addedNames.size() == 2 && addedNames.contains(topicName5) && @@ -246,21 +246,21 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { removedNames.contains(topicName2)); // totally 2 different list, should return content of first lists. - List<String> addedNames2 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames); + List<String> addedNames2 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames); assertTrue(addedNames2.size() == 2 && addedNames2.contains(topicName5) && addedNames2.contains(topicName6)); // 2 same list, should return empty list. - List<String> addedNames3 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames); + List<String> addedNames3 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames); assertEquals(addedNames3.size(), 0); // empty list minus: addedNames2.size = 2, addedNames3.size = 0 - List<String> addedNames4 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3); + List<String> addedNames4 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3); assertTrue(addedNames4.size() == addedNames2.size()); addedNames4.forEach(name -> assertTrue(addedNames2.contains(name))); - List<String> addedNames5 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2); + List<String> addedNames5 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2); assertEquals(addedNames5.size(), 0); } @@ -290,10 +290,10 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { .subscribe(); // 3. verify consumer get methods, to get 0 number of partitions and topics. - assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 0); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 0); + assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern()); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 0); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 0); // 4. create producer String messagePredicate = "my-message-" + key + "-"; @@ -310,15 +310,15 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { // 5. call recheckTopics to subscribe each added topics above log.debug("recheck topics change"); - PatternTopicsConsumerImpl<byte[]> consumer1 = ((PatternTopicsConsumerImpl<byte[]>) consumer); + PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer); consumer1.run(consumer1.getRecheckPatternTimeout()); Thread.sleep(100); // 6. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3. - assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); + assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern()); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); // 7. produce data @@ -384,13 +384,13 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof PatternTopicsConsumerImpl); + assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl); // 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3 - assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); + assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern()); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); // 5. produce data to topic 1,2,3; verify should receive all the message for (int i = 0; i < totalMessages / 3; i++) { @@ -419,12 +419,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { // 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4 log.debug("recheck topics change"); - PatternTopicsConsumerImpl<byte[]> consumer1 = ((PatternTopicsConsumerImpl<byte[]>) consumer); + PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer); consumer1.run(consumer1.getRecheckPatternTimeout()); Thread.sleep(100); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 10); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 10); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 4); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 10); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 10); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 4); // 8. produce data to topic3 and topic4, verify should receive all the message for (int i = 0; i < totalMessages / 2; i++) { @@ -487,13 +487,13 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof PatternTopicsConsumerImpl); + assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl); // 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3 - assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); + assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern()); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); // 5. produce data to topic 1,2,3; verify should receive all the message for (int i = 0; i < totalMessages / 3; i++) { @@ -521,12 +521,12 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3 log.debug("recheck topics change"); - PatternTopicsConsumerImpl<byte[]> consumer1 = ((PatternTopicsConsumerImpl<byte[]>) consumer); + PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer); consumer1.run(consumer1.getRecheckPatternTimeout()); Thread.sleep(100); - assertEquals(((PatternTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 2); - assertEquals(((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers().size(), 2); - assertEquals(((PatternTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 1); + assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 2); + assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().size(), 2); + assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 1); // 8. produce data to topic2, verify should receive all the message for (int i = 0; i < totalMessages; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java index 7a94cc2..053cb5e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java @@ -338,7 +338,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends BrokerTestBase { } private static long getUnackedMessagesCountInPartitionedConsumer(Consumer<byte[]> c) { - PartitionedConsumerImpl<byte[]> pc = (PartitionedConsumerImpl<byte[]>) c; + MultiTopicsConsumerImpl<byte[]> pc = (MultiTopicsConsumerImpl<byte[]>) c; return pc.getUnAckedMessageTracker().size() + pc.getConsumers().stream().mapToLong(consumer -> consumer.getUnAckedMessageTracker().size()).sum(); } @@ -405,8 +405,8 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends BrokerTestBase { assertEquals(received, 5); // 7. Simulate ackTimeout - ((PartitionedConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle(); - ((PartitionedConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); + ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle(); + ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); // 8. producer publish more messages for (int i = 0; i < totalMessages / 3; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 952dfac..6aa574d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -116,10 +116,10 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof TopicsConsumerImpl); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); - List<String> topics = ((TopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics(); - List<ConsumerImpl<byte[]>> consumers = ((TopicsConsumerImpl) consumer).getConsumers(); + List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics(); + List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers(); topics.forEach(topic -> log.info("topic: {}", topic)); consumers.forEach(c -> log.info("consumer: {}", c.getTopic())); @@ -127,7 +127,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { IntStream.range(0, 6).forEach(index -> assertTrue(topics.get(index).equals(consumers.get(index).getTopic()))); - assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size() == 3); + assertTrue(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size() == 3); consumer.unsubscribe(); consumer.close(); @@ -167,7 +167,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof TopicsConsumerImpl); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); // 3. producer publish messages for (int i = 0; i < totalMessages / 3; i++) { @@ -228,7 +228,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof TopicsConsumerImpl); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); // Asynchronously produce messages List<Future<MessageId>> futures = Lists.newArrayList(); @@ -307,7 +307,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof TopicsConsumerImpl); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); // 3. producer publish messages for (int i = 0; i < totalMessages / 3; i++) { @@ -323,7 +323,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { log.debug("Consumer received : " + new String(message.getData())); message = consumer.receive(500, TimeUnit.MILLISECONDS); } - long size = ((TopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); + long size = ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.debug(key + " Unacked Message Tracker size is " + size); assertEquals(size, totalMessages); @@ -338,7 +338,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { message = consumer.receive(500, TimeUnit.MILLISECONDS); } while (message != null); - size = ((TopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); + size = ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.debug(key + " Unacked Message Tracker size is " + size); assertEquals(size, 0); assertEquals(hSet.size(), totalMessages); @@ -361,14 +361,14 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { consumer.acknowledge(message); message = consumer.receive(100, TimeUnit.MILLISECONDS); } - size = ((TopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); + size = ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.debug(key + " Unacked Message Tracker size is " + size); assertEquals(size, 0); assertEquals(received, totalMessages); // 8. Simulate ackTimeout - ((TopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle(); - ((TopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); + ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle(); + ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); // 9. producer publish more messages for (int i = 0; i < totalMessages / 3; i++) { @@ -384,7 +384,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { log.debug("Consumer received : " + data); message = consumer.receive(100, TimeUnit.MILLISECONDS); } - size = ((TopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); + size = ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.debug(key + " Unacked Message Tracker size is " + size); assertEquals(size, 30); @@ -402,7 +402,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { message = consumer.receive(100, TimeUnit.MILLISECONDS); } assertEquals(redelivered, 30); - size = ((TopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); + size = ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); assertEquals(size, 0); @@ -447,7 +447,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof TopicsConsumerImpl); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); // 3. producer publish messages for (int i = 0; i < totalMessages / 3; i++) { @@ -468,7 +468,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { assertEquals(messageSet, totalMessages); // 4, unsubscribe topic3 - CompletableFuture<Void> unsubFuture = ((TopicsConsumerImpl<byte[]>) consumer).unsubscribeAsync(topicName3); + CompletableFuture<Void> unsubFuture = ((MultiTopicsConsumerImpl<byte[]>) consumer).unsubscribeAsync(topicName3); unsubFuture.get(); // 5. producer publish messages @@ -491,15 +491,15 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { assertEquals(messageSet, totalMessages * 2 / 3); // 7. use getter to verify internal topics number after un-subscribe topic3 - List<String> topics = ((TopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics(); - List<ConsumerImpl<byte[]>> consumers = ((TopicsConsumerImpl) consumer).getConsumers(); + List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics(); + List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers(); assertEquals(topics.size(), 3); assertEquals(consumers.size(), 3); - assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size() == 2); + assertTrue(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size() == 2); // 8. re-subscribe topic3 - CompletableFuture<Void> subFuture = ((TopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3); + CompletableFuture<Void> subFuture = ((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3); subFuture.get(); // 9. producer publish messages @@ -522,12 +522,12 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { assertEquals(messageSet, totalMessages); // 11. use getter to verify internal topics number after subscribe topic3 - topics = ((TopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics(); - consumers = ((TopicsConsumerImpl) consumer).getConsumers(); + topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics(); + consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers(); assertEquals(topics.size(), 6); assertEquals(consumers.size(), 6); - assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size() == 3); + assertTrue(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size() == 3); consumer.unsubscribe(); consumer.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2cbd642..2a88bf0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -914,7 +914,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex()); } if (partitionIndex != -1) { - // we should no longer track this message, PartitionedConsumerImpl will take care from now onwards + // we should no longer track this message, TopicsConsumer will take care from now onwards unAckedMessageTracker.remove(id); } else { unAckedMessageTracker.add(id); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java similarity index 74% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 1089cef..6da72ec 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -56,7 +56,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TopicsConsumerImpl<T> extends ConsumerBase<T> { +public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { // All topics should be in same namespace protected NamespaceName namespaceName; @@ -76,15 +76,15 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> { private final int sharedQueueResumeThreshold; // sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. - AtomicInteger numberTopicPartitions; + AtomicInteger allTopicPartitionsNumber; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final ConsumerStatsRecorder stats; private final UnAckedMessageTracker unAckedMessageTracker; private final ConsumerConfigurationData<T> internalConfig; - TopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor, - CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) { + MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor, + CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) { super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture, schema); @@ -95,7 +95,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> { this.consumers = new ConcurrentHashMap<>(); this.pausedConsumers = new ConcurrentLinkedQueue<>(); this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; - this.numberTopicPartitions = new AtomicInteger(0); + this.allTopicPartitionsNumber = new AtomicInteger(0); if (conf.getAckTimeoutMillis() != 0) { this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis()); @@ -109,7 +109,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> { if (conf.getTopicNames().isEmpty()) { this.namespaceName = null; setState(State.Ready); - subscribeFuture().complete(TopicsConsumerImpl.this); + subscribeFuture().complete(MultiTopicsConsumerImpl.this); return; } @@ -122,15 +122,15 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> { FutureUtil.waitForAll(futures) .thenAccept(finalFuture -> { try { - if (numberTopicPartitions.get() > maxReceiverQueueSize) { - setMaxReceiverQueueSize(numberTopicPartitions.get()); + if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) { + setMaxReceiverQueueSize(allTopicPartitionsNumber.get()); } setState(State.Ready); // We have successfully created N consumers, so we can start receiving messages now startReceivingMessages(consumers.values().stream().collect(Collectors.toList())); - subscribeFuture().complete(TopicsConsumerImpl.this); + subscribeFuture().complete(MultiTopicsConsumerImpl.this); log.info("[{}] [{}] Created topics consumer with {} sub-consumers", - topic, subscription, numberTopicPartitions.get()); + topic, subscription, allTopicPartitionsNumber.get()); } catch (PulsarClientException e) { log.warn("[{}] Failed startReceivingMessages while subscribe topics: {}", topic, e.getMessage()); subscribeFuture.completeExceptionally(e); @@ -245,7 +245,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> { } else { // Enqueue the message so that it can be retrieved when application calls receive() // Waits for the queue to have space for the message - // This should never block cause TopicsConsumerImpl should always use GrowableArrayBlockingQueue + // This should never block cause MultiTopicsConsumerImpl should always use GrowableArrayBlockingQueue incomingMessages.put(topicMessage); } } catch (InterruptedException e) { @@ -271,7 +271,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> { log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, message.getMessageId()); } - listener.received(TopicsConsumerImpl.this, msg); + listener.received(MultiTopicsConsumerImpl.this, msg); } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, message, t); @@ -613,117 +613,172 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> { } CompletableFuture<Void> subscribeResult = new CompletableFuture<>(); - final AtomicInteger partitionNumber = new AtomicInteger(0); - client.getPartitionedTopicMetadata(topicName).thenAccept(metadata -> { - if (log.isDebugEnabled()) { - log.debug("Received topic {} metadata.partitions: {}", topicName, metadata.partitions); - } + client.getPartitionedTopicMetadata(topicName) + .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, topicName, metadata.partitions)) + .exceptionally(ex1 -> { + log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage()); + subscribeResult.completeExceptionally(ex1); + return null; + }); - List<CompletableFuture<Consumer<T>>> futureList; - - if (metadata.partitions > 1) { - this.topics.putIfAbsent(topicName, metadata.partitions); - numberTopicPartitions.addAndGet(metadata.partitions); - partitionNumber.addAndGet(metadata.partitions); - - futureList = IntStream - .range(0, partitionNumber.get()) - .mapToObj( - partitionIndex -> { - String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); - CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); - ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, partitionName, internalConfig, - client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, schema); - consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); - return subFuture; - }) - .collect(Collectors.toList()); - } else { - this.topics.putIfAbsent(topicName, 1); - numberTopicPartitions.incrementAndGet(); - partitionNumber.incrementAndGet(); + return subscribeResult; + } - CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); - ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, topicName, internalConfig, - client.externalExecutorProvider().getExecutor(), 0, subFuture, schema); - consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); + // create consumer for a single topic with already known partitions. + // first create a consumer with no topic, then do subscription for already know partitionedTopic. + public static <T> MultiTopicsConsumerImpl<T> createPartitionedConsumer(PulsarClientImpl client, + ConsumerConfigurationData<T> conf, + ExecutorService listenerExecutor, + CompletableFuture<Consumer<T>> subscribeFuture, + int numPartitions, + Schema<T> schema) { + checkArgument(conf.getTopicNames().size() == 1, "Should have only 1 topic for partitioned consumer"); + + // get topic name, then remove it from conf, so constructor will create a consumer with no topic. + ConsumerConfigurationData cloneConf = conf.clone(); + String topicName = cloneConf.getSingleTopic(); + cloneConf.getTopicNames().remove(topicName); + + CompletableFuture<Consumer> future = new CompletableFuture<>(); + MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, cloneConf, listenerExecutor, future, schema); + + future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions)) + .thenRun(()-> subscribeFuture.complete(consumer)) + .exceptionally(e -> { + log.warn("Failed subscription for createPartitionedConsumer: {} {}, e:{}", + topicName, numPartitions, e); + subscribeFuture.completeExceptionally(((Throwable)e).getCause()); + return null; + });; + return consumer; + } - futureList = Collections.singletonList(subFuture); - } + // subscribe one more given topic, but already know the numberPartitions + private CompletableFuture<Void> subscribeAsync(String topicName, int numberPartitions) { + if (!topicNameValid(topicName)) { + return FutureUtil.failedFuture( + new PulsarClientException.AlreadyClosedException("Topic name not valid")); + } - FutureUtil.waitForAll(futureList) - .thenAccept(finalFuture -> { - try { - if (numberTopicPartitions.get() > maxReceiverQueueSize) { - setMaxReceiverQueueSize(numberTopicPartitions.get()); - } - int numTopics = this.topics.values().stream().mapToInt(Integer::intValue).sum(); - checkState(numberTopicPartitions.get() == numTopics, - "numberTopicPartitions " + numberTopicPartitions.get() - + " not equals expected: " + numTopics); - - // We have successfully created new consumers, so we can start receiving messages for them - startReceivingMessages( - consumers.values().stream() - .filter(consumer1 -> { - String consumerTopicName = consumer1.getTopic(); - if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals( - TopicName.get(topicName).getPartitionedTopicName().toString())) { - return true; - } else { - return false; - } - }) - .collect(Collectors.toList())); - - subscribeResult.complete(null); - log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, numberTopicPartitions {}", - topic, subscription, topicName, numberTopicPartitions.get()); - if (this.namespaceName == null) { - this.namespaceName = TopicName.get(topicName).getNamespaceObject(); - } - return; - } catch (PulsarClientException e) { - handleSubscribeOneTopicError(topicName, e); - subscribeResult.completeExceptionally(e); - } - }) - .exceptionally(ex -> { - handleSubscribeOneTopicError(topicName, ex); - subscribeResult.completeExceptionally(ex); - return null; - }); - }).exceptionally(ex1 -> { - log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage()); - subscribeResult.completeExceptionally(ex1); - return null; - }); + if (getState() == State.Closing || getState() == State.Closed) { + return FutureUtil.failedFuture( + new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed")); + } + + CompletableFuture<Void> subscribeResult = new CompletableFuture<>(); + subscribeTopicPartitions(subscribeResult, topicName, numberPartitions); return subscribeResult; } - // handling failure during subscribe new topic, unsubscribe success created partitions - private void handleSubscribeOneTopicError(String topicName, Throwable error) { - log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer ", topic, topicName, error.getMessage()); + private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int partitionNumber) { + if (log.isDebugEnabled()) { + log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, partitionNumber); + } - consumers.values().stream().filter(consumer1 -> { - String consumerTopicName = consumer1.getTopic(); - if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) { - return true; - } else { - return false; - } - }).forEach(consumer2 -> { - consumer2.closeAsync().handle((ok, closeException) -> { - consumer2.subscribeFuture().completeExceptionally(error); + List<CompletableFuture<Consumer<T>>> futureList; + + if (partitionNumber > 1) { + this.topics.putIfAbsent(topicName, partitionNumber); + allTopicPartitionsNumber.addAndGet(partitionNumber); + + futureList = IntStream + .range(0, partitionNumber) + .mapToObj( + partitionIndex -> { + String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); + CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); + ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, partitionName, internalConfig, + client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, schema); + consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); + return subFuture; + }) + .collect(Collectors.toList()); + } else { + this.topics.putIfAbsent(topicName, 1); + allTopicPartitionsNumber.incrementAndGet(); + + CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); + ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, topicName, internalConfig, + client.externalExecutorProvider().getExecutor(), 0, subFuture, schema); + consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); + + futureList = Collections.singletonList(subFuture); + } + + FutureUtil.waitForAll(futureList) + .thenAccept(finalFuture -> { + try { + if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) { + setMaxReceiverQueueSize(allTopicPartitionsNumber.get()); + } + int numTopics = this.topics.values().stream().mapToInt(Integer::intValue).sum(); + checkState(allTopicPartitionsNumber.get() == numTopics, + "allTopicPartitionsNumber " + allTopicPartitionsNumber.get() + + " not equals expected: " + numTopics); + + // We have successfully created new consumers, so we can start receiving messages for them + startReceivingMessages( + consumers.values().stream() + .filter(consumer1 -> { + String consumerTopicName = consumer1.getTopic(); + if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals( + TopicName.get(topicName).getPartitionedTopicName().toString())) { + return true; + } else { + return false; + } + }) + .collect(Collectors.toList())); + + subscribeResult.complete(null); + log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, partitions: {}, allTopicPartitionsNumber: {}", + topic, subscription, topicName, partitionNumber, allTopicPartitionsNumber.get()); + if (this.namespaceName == null) { + this.namespaceName = TopicName.get(topicName).getNamespaceObject(); + } + return; + } catch (PulsarClientException e) { + handleSubscribeOneTopicError(topicName, e, subscribeResult); + } + }) + .exceptionally(ex -> { + handleSubscribeOneTopicError(topicName, ex, subscribeResult); return null; }); - consumers.remove(consumer2.getTopic()); - }); + } - topics.remove(topicName); - checkState(numberTopicPartitions.get() == consumers.values().size()); + // handling failure during subscribe new topic, unsubscribe success created partitions + private void handleSubscribeOneTopicError(String topicName, Throwable error, CompletableFuture<Void> subscribeFuture) { + log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer {}", topic, topicName, error.getMessage()); + + client.externalExecutorProvider().getExecutor().submit(() -> { + AtomicInteger toCloseNum = new AtomicInteger(0); + consumers.values().stream().filter(consumer1 -> { + String consumerTopicName = consumer1.getTopic(); + if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) { + return true; + } else { + return false; + } + }).forEach(consumer2 -> { + toCloseNum.incrementAndGet(); + consumer2.closeAsync().whenComplete((r, ex) -> { + consumer2.subscribeFuture().completeExceptionally(error); + allTopicPartitionsNumber.decrementAndGet(); + consumers.remove(consumer2.getTopic()); + if (toCloseNum.decrementAndGet() == 0) { + log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer, subscribe error: {}", + topic, topicName, error.getMessage()); + topics.remove(topicName); + checkState(allTopicPartitionsNumber.get() == consumers.values().size()); + subscribeFuture.completeExceptionally(error); + } + return; + }); + }); + }); } // un-subscribe a given topic @@ -757,15 +812,15 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> { consumersToUnsub.forEach(consumer1 -> { consumers.remove(consumer1.getTopic()); pausedConsumers.remove(consumer1); - numberTopicPartitions.decrementAndGet(); + allTopicPartitionsNumber.decrementAndGet(); }); topics.remove(topicName); ((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName); unsubscribeFuture.complete(null); - log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, numberTopicPartitions: {}", - topicName, subscription, consumerName, numberTopicPartitions); + log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, allTopicPartitionsNumber: {}", + topicName, subscription, consumerName, allTopicPartitionsNumber); } else { unsubscribeFuture.completeExceptionally(ex); setState(State.Failed); @@ -792,5 +847,5 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> { return consumers.values().stream().collect(Collectors.toList()); } - private static final Logger log = LoggerFactory.getLogger(TopicsConsumerImpl.class); + private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java deleted file mode 100644 index 9c952dd..0000000 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java +++ /dev/null @@ -1,553 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.impl; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.util.FutureUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -public class PartitionedConsumerImpl<T> extends ConsumerBase<T> { - - private final List<ConsumerImpl<T>> consumers; - - // Queue of partition consumers on which we have stopped calling receiveAsync() because the - // shared incoming queue was full - private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers; - - // Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to - // resume receiving from the paused consumer partitions - private final int sharedQueueResumeThreshold; - - private final int numPartitions; - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private final ConsumerStatsRecorderImpl stats; - private final UnAckedMessageTracker unAckedMessageTracker; - - PartitionedConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, int numPartitions, - ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) { - super(client, conf.getSingleTopic(), conf, Math.max(Math.max(2, numPartitions), conf.getReceiverQueueSize()), - listenerExecutor, subscribeFuture, schema); - this.consumers = Lists.newArrayListWithCapacity(numPartitions); - this.pausedConsumers = new ConcurrentLinkedQueue<>(); - this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; - this.numPartitions = numPartitions; - - if (conf.getAckTimeoutMillis() != 0) { - this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis()); - } else { - this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED; - } - - stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl() : null; - checkArgument(conf.getReceiverQueueSize() > 0, - "Receiver queue size needs to be greater than 0 for Partitioned Topics"); - start(); - } - - private void start() { - AtomicReference<Throwable> subscribeFail = new AtomicReference<Throwable>(); - AtomicInteger completed = new AtomicInteger(); - ConsumerConfigurationData<T> internalConfig = getInternalConsumerConfig(); - for (int partitionIndex = 0; partitionIndex < numPartitions; partitionIndex++) { - String partitionName = TopicName.get(topic).getPartition(partitionIndex).toString(); - ConsumerImpl<T> consumer = new ConsumerImpl<>(client, partitionName, internalConfig, - client.externalExecutorProvider().getExecutor(), partitionIndex, new CompletableFuture<>(), schema); - consumers.add(consumer); - consumer.subscribeFuture().handle((cons, subscribeException) -> { - if (subscribeException != null) { - setState(State.Failed); - subscribeFail.compareAndSet(null, subscribeException); - client.cleanupConsumer(this); - } - if (completed.incrementAndGet() == numPartitions) { - if (subscribeFail.get() == null) { - try { - // We have successfully created N consumers, so we can start receiving messages now - starReceivingMessages(); - setState(State.Ready); - subscribeFuture().complete(PartitionedConsumerImpl.this); - log.info("[{}] [{}] Created partitioned consumer", topic, subscription); - return null; - } catch (PulsarClientException e) { - subscribeFail.set(e); - } - } - closeAsync().handle((ok, closeException) -> { - subscribeFuture().completeExceptionally(subscribeFail.get()); - client.cleanupConsumer(this); - return null; - }); - log.error("[{}] [{}] Could not create partitioned consumer.", topic, subscription, - subscribeFail.get().getCause()); - } - return null; - }); - } - } - - private void starReceivingMessages() throws PulsarClientException { - for (ConsumerImpl<T> consumer : consumers) { - consumer.sendFlowPermitsToBroker(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize()); - receiveMessageFromConsumer(consumer); - } - } - - private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) { - consumer.receiveAsync().thenAccept(message -> { - // Process the message, add to the queue and trigger listener or async callback - messageReceived(message); - - // we're modifying pausedConsumers - lock.writeLock().lock(); - try { - int size = incomingMessages.size(); - if (size >= maxReceiverQueueSize - || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) { - // mark this consumer to be resumed later: if No more space left in shared queue, - // or if any consumer is already paused (to create fair chance for already paused consumers) - pausedConsumers.add(consumer); - } else { - // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid - // recursion and stack overflow - client.eventLoopGroup().execute(() -> { - receiveMessageFromConsumer(consumer); - }); - } - } finally { - lock.writeLock().unlock(); - } - }); - } - - private void resumeReceivingFromPausedConsumersIfNeeded() { - lock.readLock().lock(); - try { - if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) { - while (true) { - ConsumerImpl<T> consumer = pausedConsumers.poll(); - if (consumer == null) { - break; - } - - // if messages are readily available on consumer we will attempt to writeLock on the same thread - client.eventLoopGroup().execute(() -> { - receiveMessageFromConsumer(consumer); - }); - } - } - } finally { - lock.readLock().unlock(); - } - } - - @Override - protected Message<T> internalReceive() throws PulsarClientException { - Message<T> message; - try { - message = incomingMessages.take(); - unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); - resumeReceivingFromPausedConsumersIfNeeded(); - return message; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); - } - } - - @Override - protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException { - Message<T> message; - try { - message = incomingMessages.poll(timeout, unit); - if (message != null) { - unAckedMessageTracker.add(message.getMessageId()); - } - resumeReceivingFromPausedConsumersIfNeeded(); - return message; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); - } - } - - @Override - protected CompletableFuture<Message<T>> internalReceiveAsync() { - CompletableFuture<Message<T>> result = new CompletableFuture<>(); - Message<T> message; - try { - lock.writeLock().lock(); - message = incomingMessages.poll(0, TimeUnit.SECONDS); - if (message == null) { - pendingReceives.add(result); - } else { - unAckedMessageTracker.add(message.getMessageId()); - resumeReceivingFromPausedConsumersIfNeeded(); - result.complete(message); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - result.completeExceptionally(new PulsarClientException(e)); - } finally { - lock.writeLock().unlock(); - } - - return result; - } - - @Override - protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType, - Map<String,Long> properties) { - checkArgument(messageId instanceof MessageIdImpl); - - if (getState() != State.Ready) { - return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); - } - - if (ackType == AckType.Cumulative) { - return FutureUtil.failedFuture(new PulsarClientException.NotSupportedException( - "Cumulative acknowledge not supported for partitioned topics")); - } else { - - ConsumerImpl<T> consumer = consumers.get(((MessageIdImpl) messageId).getPartitionIndex()); - return consumer.doAcknowledge(messageId, ackType, properties).thenRun(() -> - unAckedMessageTracker.remove(messageId)); - } - - } - - @Override - public CompletableFuture<Void> unsubscribeAsync() { - if (getState() == State.Closing || getState() == State.Closed) { - return FutureUtil.failedFuture( - new PulsarClientException.AlreadyClosedException("Partitioned Consumer was already closed")); - } - setState(State.Closing); - - AtomicReference<Throwable> unsubscribeFail = new AtomicReference<Throwable>(); - AtomicInteger completed = new AtomicInteger(numPartitions); - CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>(); - for (Consumer<T> consumer : consumers) { - if (consumer != null) { - consumer.unsubscribeAsync().handle((unsubscribed, ex) -> { - if (ex != null) { - unsubscribeFail.compareAndSet(null, ex); - } - if (completed.decrementAndGet() == 0) { - if (unsubscribeFail.get() == null) { - setState(State.Closed); - unAckedMessageTracker.close(); - unsubscribeFuture.complete(null); - log.info("[{}] [{}] Unsubscribed Partitioned Consumer", topic, subscription); - } else { - setState(State.Failed); - unsubscribeFuture.completeExceptionally(unsubscribeFail.get()); - log.error("[{}] [{}] Could not unsubscribe Partitioned Consumer", topic, subscription, - unsubscribeFail.get().getCause()); - } - } - - return null; - }); - } - - } - - return unsubscribeFuture; - } - - @Override - public CompletableFuture<Void> closeAsync() { - if (getState() == State.Closing || getState() == State.Closed) { - unAckedMessageTracker.close(); - return CompletableFuture.completedFuture(null); - } - setState(State.Closing); - - AtomicReference<Throwable> closeFail = new AtomicReference<Throwable>(); - AtomicInteger completed = new AtomicInteger(numPartitions); - CompletableFuture<Void> closeFuture = new CompletableFuture<>(); - for (Consumer<T> consumer : consumers) { - if (consumer != null) { - consumer.closeAsync().handle((closed, ex) -> { - if (ex != null) { - closeFail.compareAndSet(null, ex); - } - if (completed.decrementAndGet() == 0) { - if (closeFail.get() == null) { - setState(State.Closed); - unAckedMessageTracker.close(); - closeFuture.complete(null); - log.info("[{}] [{}] Closed Partitioned Consumer", topic, subscription); - client.cleanupConsumer(this); - // fail all pending-receive futures to notify application - failPendingReceive(); - } else { - setState(State.Failed); - closeFuture.completeExceptionally(closeFail.get()); - log.error("[{}] [{}] Could not close Partitioned Consumer", topic, subscription, - closeFail.get().getCause()); - } - } - - return null; - }); - } - - } - - return closeFuture; - } - - private void failPendingReceive() { - lock.readLock().lock(); - try { - if (listenerExecutor != null && !listenerExecutor.isShutdown()) { - while (!pendingReceives.isEmpty()) { - CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll(); - if (receiveFuture != null) { - receiveFuture.completeExceptionally( - new PulsarClientException.AlreadyClosedException("Consumer is already closed")); - } else { - break; - } - } - } - } finally { - lock.readLock().unlock(); - } - } - - @Override - public boolean isConnected() { - return consumers.stream().allMatch(ConsumerImpl::isConnected); - } - - void messageReceived(Message<T> message) { - lock.writeLock().lock(); - try { - unAckedMessageTracker.add(message.getMessageId()); - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Received message from partitioned-consumer {}", topic, subscription, message.getMessageId()); - } - // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue - if (!pendingReceives.isEmpty()) { - CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll(); - listenerExecutor.execute(() -> receivedFuture.complete(message)); - } else { - // Enqueue the message so that it can be retrieved when application calls receive() - // Waits for the queue to have space for the message - // This should never block cause PartitonedConsumerImpl should always use GrowableArrayBlockingQueue - incomingMessages.put(message); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - lock.writeLock().unlock(); - } - - if (listener != null) { - // Trigger the notification on the message listener in a separate thread to avoid blocking the networking - // thread while the message processing happens - listenerExecutor.execute(() -> { - Message<T> msg; - try { - msg = internalReceive(); - } catch (PulsarClientException e) { - log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); - return; - } - - try { - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, message.getMessageId()); - } - listener.received(PartitionedConsumerImpl.this, msg); - } catch (Throwable t) { - log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, message, - t); - } - }); - } - } - - @Override - String getHandlerName() { - return subscription; - } - - private ConsumerConfigurationData<T> getInternalConsumerConfig() { - ConsumerConfigurationData<T> internalConsumerConfig = new ConsumerConfigurationData<>(); - internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize()); - internalConsumerConfig.setSubscriptionName(conf.getSubscriptionName()); - internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType()); - internalConsumerConfig.setConsumerName(consumerName); - internalConsumerConfig.setAcknowledgementsGroupTimeMicros(conf.getAcknowledgementsGroupTimeMicros()); - internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel()); - internalConsumerConfig.setProperties(conf.getProperties()); - internalConsumerConfig.setReadCompacted(conf.isReadCompacted()); - if (null != conf.getConsumerEventListener()) { - internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener()); - } - - int receiverQueueSize = Math.min(conf.getReceiverQueueSize(), - conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions); - internalConsumerConfig.setReceiverQueueSize(receiverQueueSize); - - if (conf.getCryptoKeyReader() != null) { - internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader()); - internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction()); - } - if (conf.getAckTimeoutMillis() != 0) { - internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis()); - } - - return internalConsumerConfig; - } - - @Override - public void redeliverUnacknowledgedMessages() { - synchronized (this) { - for (ConsumerImpl<T> c : consumers) { - c.redeliverUnacknowledgedMessages(); - } - incomingMessages.clear(); - unAckedMessageTracker.clear(); - resumeReceivingFromPausedConsumersIfNeeded(); - } - } - - @Override - public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) { - checkArgument(messageIds.stream().findFirst().get() instanceof MessageIdImpl); - if (conf.getSubscriptionType() != SubscriptionType.Shared) { - // We cannot redeliver single messages if subscription type is not Shared - redeliverUnacknowledgedMessages(); - return; - } - removeExpiredMessagesFromQueue(messageIds); - messageIds.stream() - .map(messageId -> (MessageIdImpl)messageId) - .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, Collectors.toSet())) - .forEach((partitionIndex, messageIds1) -> - consumers.get(partitionIndex).redeliverUnacknowledgedMessages( - messageIds1.stream().map(mid -> (MessageId)mid).collect(Collectors.toSet()))); - resumeReceivingFromPausedConsumersIfNeeded(); - } - - @Override - public void seek(MessageId messageId) throws PulsarClientException { - try { - seekAsync(messageId).get(); - } catch (ExecutionException e) { - throw new PulsarClientException(e.getCause()); - } catch (InterruptedException e) { - throw new PulsarClientException(e); - } - } - - @Override - public CompletableFuture<Void> seekAsync(MessageId messageId) { - return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on partitioned topics")); - } - - List<ConsumerImpl<T>> getConsumers() { - return consumers; - } - - @Override - public int getAvailablePermits() { - return consumers.stream().mapToInt(ConsumerImpl::getAvailablePermits).sum(); - } - - @Override - public boolean hasReachedEndOfTopic() { - return consumers.stream().allMatch(Consumer::hasReachedEndOfTopic); - } - - @Override - public int numMessagesInQueue() { - return incomingMessages.size() + consumers.stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum(); - } - - @Override - public synchronized ConsumerStatsRecorderImpl getStats() { - if (stats == null) { - return null; - } - stats.reset(); - for (int i = 0; i < numPartitions; i++) { - stats.updateCumulativeStats(consumers.get(i).getStats()); - } - return stats; - } - - public UnAckedMessageTracker getUnAckedMessageTracker() { - return unAckedMessageTracker; - } - - private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) { - Message<T> peek = incomingMessages.peek(); - if (peek != null) { - if (!messageIds.contains(peek.getMessageId())) { - // first message is not expired, then no message is expired in queue. - return; - } - - // try not to remove elements that are added while we remove - Message<T> message = incomingMessages.poll(); - while (message != null) { - MessageIdImpl messageId = (MessageIdImpl) message.getMessageId(); - if (!messageIds.contains(messageId)) { - messageIds.add(messageId); - break; - } - message = incomingMessages.poll(); - } - } - } - - private static final Logger log = LoggerFactory.getLogger(PartitionedConsumerImpl.class); -} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java similarity index 89% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index f9cf550..d0b0c60 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -41,17 +41,17 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PatternTopicsConsumerImpl<T> extends TopicsConsumerImpl<T> implements TimerTask { +public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T> implements TimerTask { private final Pattern topicsPattern; private final TopicsChangedListener topicsChangeListener; private volatile Timeout recheckPatternTimeout = null; - public PatternTopicsConsumerImpl(Pattern topicsPattern, - PulsarClientImpl client, - ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, - CompletableFuture<Consumer<T>> subscribeFuture, - Schema<T> schema) { + public PatternMultiTopicsConsumerImpl(Pattern topicsPattern, + PulsarClientImpl client, + ConsumerConfigurationData<T> conf, + ExecutorService listenerExecutor, + CompletableFuture<Consumer<T>> subscribeFuture, + Schema<T> schema) { super(client, conf, listenerExecutor, subscribeFuture, schema); this.topicsPattern = topicsPattern; @@ -86,7 +86,7 @@ public class PatternTopicsConsumerImpl<T> extends TopicsConsumerImpl<T> implemen } List<String> newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern); - List<String> oldTopics = PatternTopicsConsumerImpl.this.getTopics(); + List<String> oldTopics = PatternMultiTopicsConsumerImpl.this.getTopics(); futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics))); futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics))); @@ -100,7 +100,7 @@ public class PatternTopicsConsumerImpl<T> extends TopicsConsumerImpl<T> implemen }); // schedule the next re-check task - client.timer().newTimeout(PatternTopicsConsumerImpl.this, + client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this, Math.min(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.MINUTES); } @@ -109,9 +109,9 @@ public class PatternTopicsConsumerImpl<T> extends TopicsConsumerImpl<T> implemen } interface TopicsChangedListener { - // unsubscribe and delete ConsumerImpl in the `consumers` map in `TopicsConsumerImpl` based on added topics. + // unsubscribe and delete ConsumerImpl in the `consumers` map in `MultiTopicsConsumerImpl` based on added topics. CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics); - // subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in `TopicsConsumerImpl`. + // subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in `MultiTopicsConsumerImpl`. CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics); } @@ -181,5 +181,5 @@ public class PatternTopicsConsumerImpl<T> extends TopicsConsumerImpl<T> implemen return recheckPatternTimeout; } - private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImpl.class); + private static final Logger log = LoggerFactory.getLogger(PatternMultiTopicsConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 47e953b..dd07cf9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -384,8 +384,8 @@ public class PulsarClientImpl implements PulsarClient { // gets the next single threaded executor from the list of executors ExecutorService listenerThread = externalExecutorProvider.getExecutor(); if (metadata.partitions > 1) { - consumer = new PartitionedConsumerImpl<>(PulsarClientImpl.this, conf, metadata.partitions, listenerThread, - consumerSubscribedFuture, schema); + consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf, + listenerThread, consumerSubscribedFuture, metadata.partitions, schema); } else { consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic, conf, listenerThread, -1, consumerSubscribedFuture, schema); @@ -406,7 +406,7 @@ public class PulsarClientImpl implements PulsarClient { private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) { CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>(); - ConsumerBase<T> consumer = new TopicsConsumerImpl<>(PulsarClientImpl.this, conf, + ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf, externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema); synchronized (consumers) { @@ -436,7 +436,7 @@ public class PulsarClientImpl implements PulsarClient { List<String> topicsList = topicsPatternFilter(topics, conf.getTopicsPattern()); conf.getTopicNames().addAll(topicsList); - ConsumerBase<T> consumer = new PatternTopicsConsumerImpl<>(conf.getTopicsPattern(), + ConsumerBase<T> consumer = new PatternMultiTopicsConsumerImpl<>(conf.getTopicsPattern(), PulsarClientImpl.this, conf, externalExecutorProvider.getExecutor(), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index 6f32188..5ae452e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -27,7 +27,7 @@ public class TopicMessageImpl<T> implements Message<T> { private final String topicName; private final Message<T> msg; - private final MessageId msgId; + private final TopicMessageIdImpl msgId; TopicMessageImpl(String topicName, Message<T> msg) { @@ -49,6 +49,10 @@ public class TopicMessageImpl<T> implements Message<T> { return msgId; } + public MessageId getInnerMessageId() { + return msgId.getInnerMessageId(); + } + @Override public Map<String, String> getProperties() { return msg.getProperties(); -- To stop receiving notification emails like this one, please contact mme...@apache.org.