This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
commit bcb3d3f6d15995b74c51d0507fece569361928da Author: Jia Zhai <jiaz...@users.noreply.github.com> AuthorDate: Tue Aug 14 06:05:59 2018 +0800 [cpp] receiver queue size config acorss partitions in multi-topics-consumer (#2311) * catch up receiver queue size support in multi topics consumer * add python config --- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 5 ++++- pulsar-client-cpp/python/pulsar/__init__.py | 13 ++++++++++++ pulsar-client-cpp/python/src/config.cc | 4 ++++ .../client/impl/MultiTopicsConsumerImpl.java | 23 +++++++++++++--------- 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 7be197c..6750273 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -160,9 +160,12 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result, config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs()); config.setMessageListener( boost::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(), _1, _2)); - config.setReceiverQueueSize(conf_.getReceiverQueueSize()); int numPartitions = partitionMetadata->getPartitions() >= 1 ? partitionMetadata->getPartitions() : 1; + // Apply total limit of receiver queue size across partitions + config.setReceiverQueueSize( + std::min(conf_.getReceiverQueueSize(), + (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions))); Lock lock(mutex_); topicsPartitions_.insert(std::make_pair(topicName->toString(), numPartitions)); diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 434fb07..f3b560b 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -315,6 +315,7 @@ class Client: send_timeout_millis=30000, compression_type=CompressionType.NONE, max_pending_messages=1000, + max_pending_messages_across_partitions=50000, block_if_queue_full=False, batching_enabled=False, batching_max_messages=1000, @@ -352,6 +353,9 @@ class Client: * `max_pending_messages`: Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. + * `max_pending_messages_across_partitions`: + Set the max size of the queue holding the messages pending to receive + an acknowledgment across partitions from the broker. * `block_if_queue_full`: Set whether `send_async` operations should block when the outgoing message queue is full. * `message_routing_mode`: @@ -364,6 +368,7 @@ class Client: _check_type(int, send_timeout_millis, 'send_timeout_millis') _check_type(CompressionType, compression_type, 'compression_type') _check_type(int, max_pending_messages, 'max_pending_messages') + _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions') _check_type(bool, block_if_queue_full, 'block_if_queue_full') _check_type(bool, batching_enabled, 'batching_enabled') _check_type(int, batching_max_messages, 'batching_max_messages') @@ -374,6 +379,7 @@ class Client: conf.send_timeout_millis(send_timeout_millis) conf.compression_type(compression_type) conf.max_pending_messages(max_pending_messages) + conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) conf.block_if_queue_full(block_if_queue_full) conf.batching_enabled(batching_enabled) conf.batching_max_messages(batching_max_messages) @@ -392,6 +398,7 @@ class Client: consumer_type=ConsumerType.Exclusive, message_listener=None, receiver_queue_size=1000, + max_total_receiver_queue_size_across_partitions=50000, consumer_name=None, unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, @@ -434,6 +441,9 @@ class Client: should not be interrupted when the consumer queue size is zero. The default value is 1000 messages and should work well for most use cases. + * `max_total_receiver_queue_size_across_partitions` + Set the max total receiver queue size across partitions. + This setting will be used to reduce the receiver queue size for individual partitions * `consumer_name`: Sets the consumer name. * `unacked_messages_timeout_ms`: @@ -450,6 +460,8 @@ class Client: _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') _check_type(int, receiver_queue_size, 'receiver_queue_size') + _check_type(int, max_total_receiver_queue_size_across_partitions, + 'max_total_receiver_queue_size_across_partitions') _check_type_or_none(str, consumer_name, 'consumer_name') _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') @@ -461,6 +473,7 @@ class Client: if message_listener: conf.message_listener(message_listener) conf.receiver_queue_size(receiver_queue_size) + conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) if consumer_name: conf.consumer_name(consumer_name) if unacked_messages_timeout_ms: diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index 9149add..9deee9a 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -108,6 +108,8 @@ void export_config() { .def("compression_type", &ProducerConfiguration::setCompressionType, return_self<>()) .def("max_pending_messages", &ProducerConfiguration::getMaxPendingMessages) .def("max_pending_messages", &ProducerConfiguration::setMaxPendingMessages, return_self<>()) + .def("max_pending_messages_across_partitions", &ProducerConfiguration::getMaxPendingMessagesAcrossPartitions) + .def("max_pending_messages_across_partitions", &ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_self<>()) .def("block_if_queue_full", &ProducerConfiguration::getBlockIfQueueFull) .def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>()) .def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode) @@ -128,6 +130,8 @@ void export_config() { .def("message_listener", &ConsumerConfiguration_setMessageListener, return_self<>()) .def("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize) .def("receiver_queue_size", &ConsumerConfiguration::setReceiverQueueSize) + .def("max_total_receiver_queue_size_across_partitions", &ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions) + .def("max_total_receiver_queue_size_across_partitions", &ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions) .def("consumer_name", &ConsumerConfiguration::getConsumerName, return_value_policy<copy_const_reference>()) .def("consumer_name", &ConsumerConfiguration::setConsumerName) .def("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 4a0c449..bbfb3f3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -64,7 +64,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { // Map <topic+partition, consumer>, when get do ACK, consumer will by find by topic name private final ConcurrentHashMap<String, ConsumerImpl<T>> consumers; - // Map <topic, partitionNumber>, store partition number for each topic + // Map <topic, numPartitions>, store partition number for each topic protected final ConcurrentHashMap<String, Integer> topics; // Queue of partition consumers on which we have stopped calling receiveAsync() because the @@ -670,24 +670,29 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { return subscribeResult; } - private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int partitionNumber) { + private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) { if (log.isDebugEnabled()) { - log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, partitionNumber); + log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions); } List<CompletableFuture<Consumer<T>>> futureList; - if (partitionNumber > 1) { - this.topics.putIfAbsent(topicName, partitionNumber); - allTopicPartitionsNumber.addAndGet(partitionNumber); + if (numPartitions > 1) { + this.topics.putIfAbsent(topicName, numPartitions); + allTopicPartitionsNumber.addAndGet(numPartitions); + + int receiverQueueSize = Math.min(conf.getReceiverQueueSize(), + conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions); + ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig(); + configurationData.setReceiverQueueSize(receiverQueueSize); futureList = IntStream - .range(0, partitionNumber) + .range(0, numPartitions) .mapToObj( partitionIndex -> { String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); - ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, partitionName, internalConfig, + ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, partitionName, configurationData, client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, schema); consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); return subFuture; @@ -732,7 +737,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { subscribeResult.complete(null); log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, partitions: {}, allTopicPartitionsNumber: {}", - topic, subscription, topicName, partitionNumber, allTopicPartitionsNumber.get()); + topic, subscription, topicName, numPartitions, allTopicPartitionsNumber.get()); if (this.namespaceName == null) { this.namespaceName = TopicName.get(topicName).getNamespaceObject(); }