This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit bcb3d3f6d15995b74c51d0507fece569361928da
Author: Jia Zhai <jiaz...@users.noreply.github.com>
AuthorDate: Tue Aug 14 06:05:59 2018 +0800

    [cpp] receiver queue size config acorss partitions in multi-topics-consumer 
(#2311)
    
    * catch up receiver queue size support in multi topics consumer
    
    * add python config
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |  5 ++++-
 pulsar-client-cpp/python/pulsar/__init__.py        | 13 ++++++++++++
 pulsar-client-cpp/python/src/config.cc             |  4 ++++
 .../client/impl/MultiTopicsConsumerImpl.java       | 23 +++++++++++++---------
 4 files changed, 35 insertions(+), 10 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 7be197c..6750273 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -160,9 +160,12 @@ void 
MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
     
config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
     config.setMessageListener(
         boost::bind(&MultiTopicsConsumerImpl::messageReceived, 
shared_from_this(), _1, _2));
-    config.setReceiverQueueSize(conf_.getReceiverQueueSize());
 
     int numPartitions = partitionMetadata->getPartitions() >= 1 ? 
partitionMetadata->getPartitions() : 1;
+    // Apply total limit of receiver queue size across partitions
+    config.setReceiverQueueSize(
+        std::min(conf_.getReceiverQueueSize(),
+                 (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / 
numPartitions)));
 
     Lock lock(mutex_);
     topicsPartitions_.insert(std::make_pair(topicName->toString(), 
numPartitions));
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 434fb07..f3b560b 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -315,6 +315,7 @@ class Client:
                         send_timeout_millis=30000,
                         compression_type=CompressionType.NONE,
                         max_pending_messages=1000,
+                        max_pending_messages_across_partitions=50000,
                         block_if_queue_full=False,
                         batching_enabled=False,
                         batching_max_messages=1000,
@@ -352,6 +353,9 @@ class Client:
         * `max_pending_messages`:
           Set the max size of the queue holding the messages pending to receive
           an acknowledgment from the broker.
+        * `max_pending_messages_across_partitions`:
+          Set the max size of the queue holding the messages pending to receive
+          an acknowledgment across partitions from the broker.
         * `block_if_queue_full`: Set whether `send_async` operations should
           block when the outgoing message queue is full.
         * `message_routing_mode`:
@@ -364,6 +368,7 @@ class Client:
         _check_type(int, send_timeout_millis, 'send_timeout_millis')
         _check_type(CompressionType, compression_type, 'compression_type')
         _check_type(int, max_pending_messages, 'max_pending_messages')
+        _check_type(int, max_pending_messages_across_partitions, 
'max_pending_messages_across_partitions')
         _check_type(bool, block_if_queue_full, 'block_if_queue_full')
         _check_type(bool, batching_enabled, 'batching_enabled')
         _check_type(int, batching_max_messages, 'batching_max_messages')
@@ -374,6 +379,7 @@ class Client:
         conf.send_timeout_millis(send_timeout_millis)
         conf.compression_type(compression_type)
         conf.max_pending_messages(max_pending_messages)
+        
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
         conf.block_if_queue_full(block_if_queue_full)
         conf.batching_enabled(batching_enabled)
         conf.batching_max_messages(batching_max_messages)
@@ -392,6 +398,7 @@ class Client:
                   consumer_type=ConsumerType.Exclusive,
                   message_listener=None,
                   receiver_queue_size=1000,
+                  max_total_receiver_queue_size_across_partitions=50000,
                   consumer_name=None,
                   unacked_messages_timeout_ms=None,
                   broker_consumer_stats_cache_time_ms=30000,
@@ -434,6 +441,9 @@ class Client:
           should not be interrupted when the consumer queue size is zero. The
           default value is 1000 messages and should work well for most use
           cases.
+        * `max_total_receiver_queue_size_across_partitions`
+          Set the max total receiver queue size across partitions.
+          This setting will be used to reduce the receiver queue size for 
individual partitions
         * `consumer_name`:
           Sets the consumer name.
         * `unacked_messages_timeout_ms`:
@@ -450,6 +460,8 @@ class Client:
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
         _check_type(int, receiver_queue_size, 'receiver_queue_size')
+        _check_type(int, max_total_receiver_queue_size_across_partitions,
+                    'max_total_receiver_queue_size_across_partitions')
         _check_type_or_none(str, consumer_name, 'consumer_name')
         _check_type_or_none(int, unacked_messages_timeout_ms, 
'unacked_messages_timeout_ms')
         _check_type(int, broker_consumer_stats_cache_time_ms, 
'broker_consumer_stats_cache_time_ms')
@@ -461,6 +473,7 @@ class Client:
         if message_listener:
             conf.message_listener(message_listener)
         conf.receiver_queue_size(receiver_queue_size)
+        
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
         if consumer_name:
             conf.consumer_name(consumer_name)
         if unacked_messages_timeout_ms:
diff --git a/pulsar-client-cpp/python/src/config.cc 
b/pulsar-client-cpp/python/src/config.cc
index 9149add..9deee9a 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -108,6 +108,8 @@ void export_config() {
             .def("compression_type", 
&ProducerConfiguration::setCompressionType, return_self<>())
             .def("max_pending_messages", 
&ProducerConfiguration::getMaxPendingMessages)
             .def("max_pending_messages", 
&ProducerConfiguration::setMaxPendingMessages, return_self<>())
+            .def("max_pending_messages_across_partitions", 
&ProducerConfiguration::getMaxPendingMessagesAcrossPartitions)
+            .def("max_pending_messages_across_partitions", 
&ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_self<>())
             .def("block_if_queue_full", 
&ProducerConfiguration::getBlockIfQueueFull)
             .def("block_if_queue_full", 
&ProducerConfiguration::setBlockIfQueueFull, return_self<>())
             .def("partitions_routing_mode", 
&ProducerConfiguration::getPartitionsRoutingMode)
@@ -128,6 +130,8 @@ void export_config() {
             .def("message_listener", 
&ConsumerConfiguration_setMessageListener, return_self<>())
             .def("receiver_queue_size", 
&ConsumerConfiguration::getReceiverQueueSize)
             .def("receiver_queue_size", 
&ConsumerConfiguration::setReceiverQueueSize)
+            .def("max_total_receiver_queue_size_across_partitions", 
&ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions)
+            .def("max_total_receiver_queue_size_across_partitions", 
&ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
             .def("consumer_name", &ConsumerConfiguration::getConsumerName, 
return_value_policy<copy_const_reference>())
             .def("consumer_name", &ConsumerConfiguration::setConsumerName)
             .def("unacked_messages_timeout_ms", 
&ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 4a0c449..bbfb3f3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -64,7 +64,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     // Map <topic+partition, consumer>, when get do ACK, consumer will by find 
by topic name
     private final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
 
-    // Map <topic, partitionNumber>, store partition number for each topic
+    // Map <topic, numPartitions>, store partition number for each topic
     protected final ConcurrentHashMap<String, Integer> topics;
 
     // Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
@@ -670,24 +670,29 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         return subscribeResult;
     }
 
-    private void subscribeTopicPartitions(CompletableFuture<Void> 
subscribeResult, String topicName, int partitionNumber) {
+    private void subscribeTopicPartitions(CompletableFuture<Void> 
subscribeResult, String topicName, int numPartitions) {
         if (log.isDebugEnabled()) {
-            log.debug("Subscribe to topic {} metadata.partitions: {}", 
topicName, partitionNumber);
+            log.debug("Subscribe to topic {} metadata.partitions: {}", 
topicName, numPartitions);
         }
 
         List<CompletableFuture<Consumer<T>>> futureList;
 
-        if (partitionNumber > 1) {
-            this.topics.putIfAbsent(topicName, partitionNumber);
-            allTopicPartitionsNumber.addAndGet(partitionNumber);
+        if (numPartitions > 1) {
+            this.topics.putIfAbsent(topicName, numPartitions);
+            allTopicPartitionsNumber.addAndGet(numPartitions);
+
+            int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
+                conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / 
numPartitions);
+            ConsumerConfigurationData<T> configurationData = 
getInternalConsumerConfig();
+            configurationData.setReceiverQueueSize(receiverQueueSize);
 
             futureList = IntStream
-                .range(0, partitionNumber)
+                .range(0, numPartitions)
                 .mapToObj(
                     partitionIndex -> {
                         String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-                        ConsumerImpl<T> newConsumer = new 
ConsumerImpl<>(client, partitionName, internalConfig,
+                        ConsumerImpl<T> newConsumer = new 
ConsumerImpl<>(client, partitionName, configurationData,
                             client.externalExecutorProvider().getExecutor(), 
partitionIndex, subFuture, schema);
                         consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
                         return subFuture;
@@ -732,7 +737,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
                     subscribeResult.complete(null);
                     log.info("[{}] [{}] Success subscribe new topic {} in 
topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
-                        topic, subscription, topicName, partitionNumber, 
allTopicPartitionsNumber.get());
+                        topic, subscription, topicName, numPartitions, 
allTopicPartitionsNumber.get());
                     if (this.namespaceName == null) {
                         this.namespaceName = 
TopicName.get(topicName).getNamespaceObject();
                     }

Reply via email to