This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a93789  Issue #2312: add python multi-topics consumer support (#2451)
9a93789 is described below

commit 9a9378944c8a6c3cd9124c888ad58f67e127ceb0
Author: Jia Zhai <jiaz...@users.noreply.github.com>
AuthorDate: Mon Aug 27 17:25:58 2018 +0800

    Issue #2312: add python multi-topics consumer support (#2451)
    
    ### Motivation
    
    app python client support for multi-topics consumer
    
    ### Modifications
    
    add 2 methods in python client
    add unit test
---
 pulsar-client-cpp/python/pulsar/__init__.py | 205 ++++++++++++++++++++++++++++
 pulsar-client-cpp/python/pulsar_test.py     | 104 ++++++++++++++
 pulsar-client-cpp/python/src/client.cc      |  35 +++++
 pulsar-client-cpp/python/src/config.cc      |   2 +
 4 files changed, 346 insertions(+)

diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 1185812..806c7e2 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -503,6 +503,211 @@ class Client:
         self._consumers.append(c)
         return c
 
+    def subscribe_topics(self, topics, subscription_name,
+                         consumer_type=ConsumerType.Exclusive,
+                         message_listener=None,
+                         receiver_queue_size=1000,
+                         max_total_receiver_queue_size_across_partitions=50000,
+                         consumer_name=None,
+                         unacked_messages_timeout_ms=None,
+                         broker_consumer_stats_cache_time_ms=30000,
+                         is_read_compacted=False,
+                         properties=None
+                         ):
+        """
+        Subscribe to the given topics and subscription combination.
+
+        **Args**
+
+        * `topics`: The list name of the topics.
+        * `subscription`: The name of the subscription.
+
+        **Options**
+
+        * `consumer_type`:
+          Select the subscription type to be used when subscribing to the 
topic.
+        * `message_listener`:
+          Sets a message listener for the consumer. When the listener is set,
+          the application will receive messages through it. Calls to
+          `consumer.receive()` will not be allowed. The listener function needs
+          to accept (consumer, message), for example:
+
+                #!python
+                def my_listener(consumer, message):
+                    # process message
+                    consumer.acknowledge(message)
+
+        * `receiver_queue_size`:
+          Sets the size of the consumer receive queue. The consumer receive
+          queue controls how many messages can be accumulated by the consumer
+          before the application calls `receive()`. Using a higher value could
+          potentially increase the consumer throughput at the expense of higher
+          memory utilization. Setting the consumer queue size to zero decreases
+          the throughput of the consumer by disabling pre-fetching of messages.
+          This approach improves the message distribution on shared 
subscription
+          by pushing messages only to those consumers that are ready to process
+          them. Neither receive with timeout nor partitioned topics can be used
+          if the consumer queue size is zero. The `receive()` function call
+          should not be interrupted when the consumer queue size is zero. The
+          default value is 1000 messages and should work well for most use
+          cases.
+        * `max_total_receiver_queue_size_across_partitions`
+          Set the max total receiver queue size across partitions.
+          This setting will be used to reduce the receiver queue size for 
individual partitions
+        * `consumer_name`:
+          Sets the consumer name.
+        * `unacked_messages_timeout_ms`:
+          Sets the timeout in milliseconds for unacknowledged messages. The
+          timeout needs to be greater than 10 seconds. An exception is thrown 
if
+          the given value is less than 10 seconds. If a successful
+          acknowledgement is not sent within the timeout, all the 
unacknowledged
+          messages are redelivered.
+        * `broker_consumer_stats_cache_time_ms`:
+          Sets the time duration for which the broker-side consumer stats will
+          be cached in the client.
+        * `properties`:
+          Sets the properties for the consumer. The properties associated with 
a consumer
+          can be used for identify a consumer at broker side.
+        """
+        _check_type(list, topics, 'topics')
+        _check_type(str, subscription_name, 'subscription_name')
+        _check_type(ConsumerType, consumer_type, 'consumer_type')
+        _check_type(int, receiver_queue_size, 'receiver_queue_size')
+        _check_type(int, max_total_receiver_queue_size_across_partitions,
+                    'max_total_receiver_queue_size_across_partitions')
+        _check_type_or_none(str, consumer_name, 'consumer_name')
+        _check_type_or_none(int, unacked_messages_timeout_ms, 
'unacked_messages_timeout_ms')
+        _check_type(int, broker_consumer_stats_cache_time_ms, 
'broker_consumer_stats_cache_time_ms')
+        _check_type(bool, is_read_compacted, 'is_read_compacted')
+        _check_type_or_none(dict, properties, 'properties')
+
+        conf = _pulsar.ConsumerConfiguration()
+        conf.consumer_type(consumer_type)
+        conf.read_compacted(is_read_compacted)
+        if message_listener:
+            conf.message_listener(message_listener)
+        conf.receiver_queue_size(receiver_queue_size)
+        
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
+        if consumer_name:
+            conf.consumer_name(consumer_name)
+        if unacked_messages_timeout_ms:
+            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
+        
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
+        c = Consumer()
+        c._consumer = self._client.subscribe_topics(topics, subscription_name, 
conf)
+        c._client = self
+        self._consumers.append(c)
+        return c
+
+    def subscribe_pattern(self, topics_pattern, subscription_name,
+                          consumer_type=ConsumerType.Exclusive,
+                          message_listener=None,
+                          receiver_queue_size=1000,
+                          
max_total_receiver_queue_size_across_partitions=50000,
+                          consumer_name=None,
+                          unacked_messages_timeout_ms=None,
+                          broker_consumer_stats_cache_time_ms=30000,
+                          is_read_compacted=False,
+                          pattern_auto_discovery_period=60,
+                          properties=None
+                          ):
+        """
+        Subscribe to multiple topics, which match given regexPattern, under 
the same namespace.
+
+        **Args**
+
+        * `topics_pattern`: The regex pattern to match topics.
+        * `subscription`: The name of the subscription.
+
+        **Options**
+
+        * `consumer_type`:
+          Select the subscription type to be used when subscribing to the 
topic.
+        * `message_listener`:
+          Sets a message listener for the consumer. When the listener is set,
+          the application will receive messages through it. Calls to
+          `consumer.receive()` will not be allowed. The listener function needs
+          to accept (consumer, message), for example:
+
+                #!python
+                def my_listener(consumer, message):
+                    # process message
+                    consumer.acknowledge(message)
+
+        * `receiver_queue_size`:
+          Sets the size of the consumer receive queue. The consumer receive
+          queue controls how many messages can be accumulated by the consumer
+          before the application calls `receive()`. Using a higher value could
+          potentially increase the consumer throughput at the expense of higher
+          memory utilization. Setting the consumer queue size to zero decreases
+          the throughput of the consumer by disabling pre-fetching of messages.
+          This approach improves the message distribution on shared 
subscription
+          by pushing messages only to those consumers that are ready to process
+          them. Neither receive with timeout nor partitioned topics can be used
+          if the consumer queue size is zero. The `receive()` function call
+          should not be interrupted when the consumer queue size is zero. The
+          default value is 1000 messages and should work well for most use
+          cases.
+        * `max_total_receiver_queue_size_across_partitions`
+          Set the max total receiver queue size across partitions.
+          This setting will be used to reduce the receiver queue size for 
individual partitions
+        * `consumer_name`:
+          Sets the consumer name.
+        * `unacked_messages_timeout_ms`:
+          Sets the timeout in milliseconds for unacknowledged messages. The
+          timeout needs to be greater than 10 seconds. An exception is thrown 
if
+          the given value is less than 10 seconds. If a successful
+          acknowledgement is not sent within the timeout, all the 
unacknowledged
+          messages are redelivered.
+        * `broker_consumer_stats_cache_time_ms`:
+          Sets the time duration for which the broker-side consumer stats will
+          be cached in the client.
+        * `pattern_auto_discovery_period`:
+          Periods of seconds for consumer to auto discover match topics.
+        * `properties`:
+          Sets the properties for the consumer. The properties associated with 
a consumer
+          can be used for identify a consumer at broker side.
+        """
+        _check_type(str, topics_pattern, 'topics_pattern')
+        _check_type(str, subscription_name, 'subscription_name')
+        _check_type(ConsumerType, consumer_type, 'consumer_type')
+        _check_type(int, receiver_queue_size, 'receiver_queue_size')
+        _check_type(int, max_total_receiver_queue_size_across_partitions,
+                    'max_total_receiver_queue_size_across_partitions')
+        _check_type_or_none(str, consumer_name, 'consumer_name')
+        _check_type_or_none(int, unacked_messages_timeout_ms, 
'unacked_messages_timeout_ms')
+        _check_type(int, broker_consumer_stats_cache_time_ms, 
'broker_consumer_stats_cache_time_ms')
+        _check_type(bool, is_read_compacted, 'is_read_compacted')
+        _check_type(int, pattern_auto_discovery_period, 
'pattern_auto_discovery_period')
+        _check_type_or_none(dict, properties, 'properties')
+
+        conf = _pulsar.ConsumerConfiguration()
+        conf.consumer_type(consumer_type)
+        conf.read_compacted(is_read_compacted)
+        if message_listener:
+            conf.message_listener(message_listener)
+        conf.receiver_queue_size(receiver_queue_size)
+        
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
+        if consumer_name:
+            conf.consumer_name(consumer_name)
+        if unacked_messages_timeout_ms:
+            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
+        
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
+        conf.pattern_auto_discovery_period(pattern_auto_discovery_period)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
+        c = Consumer()
+        c._consumer = self._client.subscribe_pattern(topics_pattern, 
subscription_name, conf)
+        c._client = self
+        self._consumers.append(c)
+        return c
+
     def create_reader(self, topic, start_message_id,
                       reader_listener=None,
                       receiver_queue_size=1000,
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index 25564b7..7fd60e6 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -650,6 +650,110 @@ class PulsarTest(TestCase):
 
         client.close()
 
+    def test_topics_consumer(self):
+        client = Client(self.serviceUrl)
+        topic1 = 
'persistent://sample/standalone/ns/my-python-topics-consumer-1'
+        topic2 = 
'persistent://sample/standalone/ns/my-python-topics-consumer-2'
+        topic3 = 
'persistent://sample/standalone/ns/my-python-topics-consumer-3'
+        topics = [topic1, topic2, topic3]
+
+        url1 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-topics-consumer-1/partitions'
+        url2 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-topics-consumer-2/partitions'
+        url3 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-topics-consumer-3/partitions'
+
+        doHttpPut(url1, '2')
+        doHttpPut(url2, '3')
+        doHttpPut(url3, '4')
+
+        producer1 = client.create_producer(topic1)
+        producer2 = client.create_producer(topic2)
+        producer3 = client.create_producer(topic3)
+
+        consumer = client.subscribe_topics(topics,
+                                           'my-topics-consumer-sub',
+                                           consumer_type=ConsumerType.Shared,
+                                           receiver_queue_size=10
+                                           )
+
+        for i in range(100):
+            producer1.send('hello-1-%d' % i)
+
+        for i in range(100):
+            producer2.send('hello-2-%d' % i)
+
+        for i in range(100):
+            producer3.send('hello-3-%d' % i)
+
+
+        for i in range(300):
+            msg = consumer.receive()
+            consumer.acknowledge(msg)
+
+        try:
+        # No other messages should be received
+            consumer.receive(timeout_millis=500)
+            self.assertTrue(False)
+        except:
+            # Exception is expected
+            pass
+        client.close()
+
+    def test_topics_pattern_consumer(self):
+        client = Client(self.serviceUrl)
+
+        topics_pattern = 
'persistent://sample/standalone/ns/my-python-pattern-consumer.*'
+
+
+        consumer = client.subscribe_pattern(topics_pattern,
+                                            'my-pattern-consumer-sub',
+                                            consumer_type=ConsumerType.Shared,
+                                            receiver_queue_size=10,
+                                            pattern_auto_discovery_period=1
+                                            )
+
+        topic1 = 
'persistent://sample/standalone/ns/my-python-pattern-consumer-1'
+        topic2 = 
'persistent://sample/standalone/ns/my-python-pattern-consumer-2'
+        topic3 = 
'persistent://sample/standalone/ns/my-python-pattern-consumer-3'
+
+        url1 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-pattern-consumer-1/partitions'
+        url2 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-pattern-consumer-2/partitions'
+        url3 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-pattern-consumer-3/partitions'
+
+        doHttpPut(url1, '2')
+        doHttpPut(url2, '3')
+        doHttpPut(url3, '4')
+
+        producer1 = client.create_producer(topic1)
+        producer2 = client.create_producer(topic2)
+        producer3 = client.create_producer(topic3)
+
+        # wait enough time to trigger auto discovery
+        time.sleep(2)
+
+        for i in range(100):
+            producer1.send('hello-1-%d' % i)
+
+        for i in range(100):
+            producer2.send('hello-2-%d' % i)
+
+        for i in range(100):
+            producer3.send('hello-3-%d' % i)
+
+
+        for i in range(300):
+            msg = consumer.receive()
+            consumer.acknowledge(msg)
+
+        try:
+            # No other messages should be received
+            consumer.receive(timeout_millis=500)
+            self.assertTrue(False)
+        except:
+            # Exception is expected
+            pass
+        client.close()
+
+
     def _check_value_error(self, fun):
         try:
             fun()
diff --git a/pulsar-client-cpp/python/src/client.cc 
b/pulsar-client-cpp/python/src/client.cc
index b5295ac..4b6055a 100644
--- a/pulsar-client-cpp/python/src/client.cc
+++ b/pulsar-client-cpp/python/src/client.cc
@@ -43,6 +43,39 @@ Consumer Client_subscribe(Client& client, const std::string& 
topic, const std::s
     return consumer;
 }
 
+Consumer Client_subscribe_topics(Client& client, boost::python::list& topics,
+                                 const std::string& subscriptionName, const 
ConsumerConfiguration& conf) {
+    Consumer consumer;
+    Result res;
+
+    std::vector<std::string> topics_vector;
+
+    for (int i = 0; i < len(topics); i ++) {
+        std::string content = boost::python::extract<std::string>(topics[i]);
+        topics_vector.push_back(content);
+    }
+
+    Py_BEGIN_ALLOW_THREADS
+        res = client.subscribe(topics_vector, subscriptionName, conf, 
consumer);
+    Py_END_ALLOW_THREADS
+
+    CHECK_RESULT(res);
+    return consumer;
+}
+
+Consumer Client_subscribe_pattern(Client& client, const std::string& 
topic_pattern, const std::string& subscriptionName,
+                                 const ConsumerConfiguration& conf) {
+    Consumer consumer;
+    Result res;
+
+    Py_BEGIN_ALLOW_THREADS
+        res = client.subscribeWithRegex(topic_pattern, subscriptionName, conf, 
consumer);
+    Py_END_ALLOW_THREADS
+
+    CHECK_RESULT(res);
+    return consumer;
+}
+
 Reader Client_createReader(Client& client, const std::string& topic,
                            const MessageId& startMessageId,
                            const ReaderConfiguration& conf) {
@@ -73,6 +106,8 @@ void export_client() {
     class_<Client>("Client", init<const std::string&, const 
ClientConfiguration& >())
             .def("create_producer", &Client_createProducer)
             .def("subscribe", &Client_subscribe)
+            .def("subscribe_topics", &Client_subscribe_topics)
+            .def("subscribe_pattern", &Client_subscribe_pattern)
             .def("create_reader", &Client_createReader)
             .def("close", &Client_close)
             .def("shutdown", &Client::shutdown)
diff --git a/pulsar-client-cpp/python/src/config.cc 
b/pulsar-client-cpp/python/src/config.cc
index 9626049..c1bfeef 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -139,6 +139,8 @@ void export_config() {
             .def("unacked_messages_timeout_ms", 
&ConsumerConfiguration::setUnAckedMessagesTimeoutMs)
             .def("broker_consumer_stats_cache_time_ms", 
&ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs)
             .def("broker_consumer_stats_cache_time_ms", 
&ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs)
+            .def("pattern_auto_discovery_period", 
&ConsumerConfiguration::getPatternAutoDiscoveryPeriod)
+            .def("pattern_auto_discovery_period", 
&ConsumerConfiguration::setPatternAutoDiscoveryPeriod)
             .def("read_compacted", &ConsumerConfiguration::isReadCompacted)
             .def("read_compacted", &ConsumerConfiguration::setReadCompacted)
             .def("property", &ConsumerConfiguration::setProperty, 
return_self<>())

Reply via email to