This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9a93789 Issue #2312: add python multi-topics consumer support (#2451) 9a93789 is described below commit 9a9378944c8a6c3cd9124c888ad58f67e127ceb0 Author: Jia Zhai <jiaz...@users.noreply.github.com> AuthorDate: Mon Aug 27 17:25:58 2018 +0800 Issue #2312: add python multi-topics consumer support (#2451) ### Motivation app python client support for multi-topics consumer ### Modifications add 2 methods in python client add unit test --- pulsar-client-cpp/python/pulsar/__init__.py | 205 ++++++++++++++++++++++++++++ pulsar-client-cpp/python/pulsar_test.py | 104 ++++++++++++++ pulsar-client-cpp/python/src/client.cc | 35 +++++ pulsar-client-cpp/python/src/config.cc | 2 + 4 files changed, 346 insertions(+) diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 1185812..806c7e2 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -503,6 +503,211 @@ class Client: self._consumers.append(c) return c + def subscribe_topics(self, topics, subscription_name, + consumer_type=ConsumerType.Exclusive, + message_listener=None, + receiver_queue_size=1000, + max_total_receiver_queue_size_across_partitions=50000, + consumer_name=None, + unacked_messages_timeout_ms=None, + broker_consumer_stats_cache_time_ms=30000, + is_read_compacted=False, + properties=None + ): + """ + Subscribe to the given topics and subscription combination. + + **Args** + + * `topics`: The list name of the topics. + * `subscription`: The name of the subscription. + + **Options** + + * `consumer_type`: + Select the subscription type to be used when subscribing to the topic. + * `message_listener`: + Sets a message listener for the consumer. When the listener is set, + the application will receive messages through it. Calls to + `consumer.receive()` will not be allowed. The listener function needs + to accept (consumer, message), for example: + + #!python + def my_listener(consumer, message): + # process message + consumer.acknowledge(message) + + * `receiver_queue_size`: + Sets the size of the consumer receive queue. The consumer receive + queue controls how many messages can be accumulated by the consumer + before the application calls `receive()`. Using a higher value could + potentially increase the consumer throughput at the expense of higher + memory utilization. Setting the consumer queue size to zero decreases + the throughput of the consumer by disabling pre-fetching of messages. + This approach improves the message distribution on shared subscription + by pushing messages only to those consumers that are ready to process + them. Neither receive with timeout nor partitioned topics can be used + if the consumer queue size is zero. The `receive()` function call + should not be interrupted when the consumer queue size is zero. The + default value is 1000 messages and should work well for most use + cases. + * `max_total_receiver_queue_size_across_partitions` + Set the max total receiver queue size across partitions. + This setting will be used to reduce the receiver queue size for individual partitions + * `consumer_name`: + Sets the consumer name. + * `unacked_messages_timeout_ms`: + Sets the timeout in milliseconds for unacknowledged messages. The + timeout needs to be greater than 10 seconds. An exception is thrown if + the given value is less than 10 seconds. If a successful + acknowledgement is not sent within the timeout, all the unacknowledged + messages are redelivered. + * `broker_consumer_stats_cache_time_ms`: + Sets the time duration for which the broker-side consumer stats will + be cached in the client. + * `properties`: + Sets the properties for the consumer. The properties associated with a consumer + can be used for identify a consumer at broker side. + """ + _check_type(list, topics, 'topics') + _check_type(str, subscription_name, 'subscription_name') + _check_type(ConsumerType, consumer_type, 'consumer_type') + _check_type(int, receiver_queue_size, 'receiver_queue_size') + _check_type(int, max_total_receiver_queue_size_across_partitions, + 'max_total_receiver_queue_size_across_partitions') + _check_type_or_none(str, consumer_name, 'consumer_name') + _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') + _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') + _check_type(bool, is_read_compacted, 'is_read_compacted') + _check_type_or_none(dict, properties, 'properties') + + conf = _pulsar.ConsumerConfiguration() + conf.consumer_type(consumer_type) + conf.read_compacted(is_read_compacted) + if message_listener: + conf.message_listener(message_listener) + conf.receiver_queue_size(receiver_queue_size) + conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) + if consumer_name: + conf.consumer_name(consumer_name) + if unacked_messages_timeout_ms: + conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) + conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) + if properties: + for k, v in properties.items(): + conf.property(k, v) + + c = Consumer() + c._consumer = self._client.subscribe_topics(topics, subscription_name, conf) + c._client = self + self._consumers.append(c) + return c + + def subscribe_pattern(self, topics_pattern, subscription_name, + consumer_type=ConsumerType.Exclusive, + message_listener=None, + receiver_queue_size=1000, + max_total_receiver_queue_size_across_partitions=50000, + consumer_name=None, + unacked_messages_timeout_ms=None, + broker_consumer_stats_cache_time_ms=30000, + is_read_compacted=False, + pattern_auto_discovery_period=60, + properties=None + ): + """ + Subscribe to multiple topics, which match given regexPattern, under the same namespace. + + **Args** + + * `topics_pattern`: The regex pattern to match topics. + * `subscription`: The name of the subscription. + + **Options** + + * `consumer_type`: + Select the subscription type to be used when subscribing to the topic. + * `message_listener`: + Sets a message listener for the consumer. When the listener is set, + the application will receive messages through it. Calls to + `consumer.receive()` will not be allowed. The listener function needs + to accept (consumer, message), for example: + + #!python + def my_listener(consumer, message): + # process message + consumer.acknowledge(message) + + * `receiver_queue_size`: + Sets the size of the consumer receive queue. The consumer receive + queue controls how many messages can be accumulated by the consumer + before the application calls `receive()`. Using a higher value could + potentially increase the consumer throughput at the expense of higher + memory utilization. Setting the consumer queue size to zero decreases + the throughput of the consumer by disabling pre-fetching of messages. + This approach improves the message distribution on shared subscription + by pushing messages only to those consumers that are ready to process + them. Neither receive with timeout nor partitioned topics can be used + if the consumer queue size is zero. The `receive()` function call + should not be interrupted when the consumer queue size is zero. The + default value is 1000 messages and should work well for most use + cases. + * `max_total_receiver_queue_size_across_partitions` + Set the max total receiver queue size across partitions. + This setting will be used to reduce the receiver queue size for individual partitions + * `consumer_name`: + Sets the consumer name. + * `unacked_messages_timeout_ms`: + Sets the timeout in milliseconds for unacknowledged messages. The + timeout needs to be greater than 10 seconds. An exception is thrown if + the given value is less than 10 seconds. If a successful + acknowledgement is not sent within the timeout, all the unacknowledged + messages are redelivered. + * `broker_consumer_stats_cache_time_ms`: + Sets the time duration for which the broker-side consumer stats will + be cached in the client. + * `pattern_auto_discovery_period`: + Periods of seconds for consumer to auto discover match topics. + * `properties`: + Sets the properties for the consumer. The properties associated with a consumer + can be used for identify a consumer at broker side. + """ + _check_type(str, topics_pattern, 'topics_pattern') + _check_type(str, subscription_name, 'subscription_name') + _check_type(ConsumerType, consumer_type, 'consumer_type') + _check_type(int, receiver_queue_size, 'receiver_queue_size') + _check_type(int, max_total_receiver_queue_size_across_partitions, + 'max_total_receiver_queue_size_across_partitions') + _check_type_or_none(str, consumer_name, 'consumer_name') + _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') + _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') + _check_type(bool, is_read_compacted, 'is_read_compacted') + _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period') + _check_type_or_none(dict, properties, 'properties') + + conf = _pulsar.ConsumerConfiguration() + conf.consumer_type(consumer_type) + conf.read_compacted(is_read_compacted) + if message_listener: + conf.message_listener(message_listener) + conf.receiver_queue_size(receiver_queue_size) + conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) + if consumer_name: + conf.consumer_name(consumer_name) + if unacked_messages_timeout_ms: + conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) + conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) + conf.pattern_auto_discovery_period(pattern_auto_discovery_period) + if properties: + for k, v in properties.items(): + conf.property(k, v) + + c = Consumer() + c._consumer = self._client.subscribe_pattern(topics_pattern, subscription_name, conf) + c._client = self + self._consumers.append(c) + return c + def create_reader(self, topic, start_message_id, reader_listener=None, receiver_queue_size=1000, diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 25564b7..7fd60e6 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -650,6 +650,110 @@ class PulsarTest(TestCase): client.close() + def test_topics_consumer(self): + client = Client(self.serviceUrl) + topic1 = 'persistent://sample/standalone/ns/my-python-topics-consumer-1' + topic2 = 'persistent://sample/standalone/ns/my-python-topics-consumer-2' + topic3 = 'persistent://sample/standalone/ns/my-python-topics-consumer-3' + topics = [topic1, topic2, topic3] + + url1 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-topics-consumer-1/partitions' + url2 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-topics-consumer-2/partitions' + url3 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-topics-consumer-3/partitions' + + doHttpPut(url1, '2') + doHttpPut(url2, '3') + doHttpPut(url3, '4') + + producer1 = client.create_producer(topic1) + producer2 = client.create_producer(topic2) + producer3 = client.create_producer(topic3) + + consumer = client.subscribe_topics(topics, + 'my-topics-consumer-sub', + consumer_type=ConsumerType.Shared, + receiver_queue_size=10 + ) + + for i in range(100): + producer1.send('hello-1-%d' % i) + + for i in range(100): + producer2.send('hello-2-%d' % i) + + for i in range(100): + producer3.send('hello-3-%d' % i) + + + for i in range(300): + msg = consumer.receive() + consumer.acknowledge(msg) + + try: + # No other messages should be received + consumer.receive(timeout_millis=500) + self.assertTrue(False) + except: + # Exception is expected + pass + client.close() + + def test_topics_pattern_consumer(self): + client = Client(self.serviceUrl) + + topics_pattern = 'persistent://sample/standalone/ns/my-python-pattern-consumer.*' + + + consumer = client.subscribe_pattern(topics_pattern, + 'my-pattern-consumer-sub', + consumer_type=ConsumerType.Shared, + receiver_queue_size=10, + pattern_auto_discovery_period=1 + ) + + topic1 = 'persistent://sample/standalone/ns/my-python-pattern-consumer-1' + topic2 = 'persistent://sample/standalone/ns/my-python-pattern-consumer-2' + topic3 = 'persistent://sample/standalone/ns/my-python-pattern-consumer-3' + + url1 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-pattern-consumer-1/partitions' + url2 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-pattern-consumer-2/partitions' + url3 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-pattern-consumer-3/partitions' + + doHttpPut(url1, '2') + doHttpPut(url2, '3') + doHttpPut(url3, '4') + + producer1 = client.create_producer(topic1) + producer2 = client.create_producer(topic2) + producer3 = client.create_producer(topic3) + + # wait enough time to trigger auto discovery + time.sleep(2) + + for i in range(100): + producer1.send('hello-1-%d' % i) + + for i in range(100): + producer2.send('hello-2-%d' % i) + + for i in range(100): + producer3.send('hello-3-%d' % i) + + + for i in range(300): + msg = consumer.receive() + consumer.acknowledge(msg) + + try: + # No other messages should be received + consumer.receive(timeout_millis=500) + self.assertTrue(False) + except: + # Exception is expected + pass + client.close() + + def _check_value_error(self, fun): try: fun() diff --git a/pulsar-client-cpp/python/src/client.cc b/pulsar-client-cpp/python/src/client.cc index b5295ac..4b6055a 100644 --- a/pulsar-client-cpp/python/src/client.cc +++ b/pulsar-client-cpp/python/src/client.cc @@ -43,6 +43,39 @@ Consumer Client_subscribe(Client& client, const std::string& topic, const std::s return consumer; } +Consumer Client_subscribe_topics(Client& client, boost::python::list& topics, + const std::string& subscriptionName, const ConsumerConfiguration& conf) { + Consumer consumer; + Result res; + + std::vector<std::string> topics_vector; + + for (int i = 0; i < len(topics); i ++) { + std::string content = boost::python::extract<std::string>(topics[i]); + topics_vector.push_back(content); + } + + Py_BEGIN_ALLOW_THREADS + res = client.subscribe(topics_vector, subscriptionName, conf, consumer); + Py_END_ALLOW_THREADS + + CHECK_RESULT(res); + return consumer; +} + +Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern, const std::string& subscriptionName, + const ConsumerConfiguration& conf) { + Consumer consumer; + Result res; + + Py_BEGIN_ALLOW_THREADS + res = client.subscribeWithRegex(topic_pattern, subscriptionName, conf, consumer); + Py_END_ALLOW_THREADS + + CHECK_RESULT(res); + return consumer; +} + Reader Client_createReader(Client& client, const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf) { @@ -73,6 +106,8 @@ void export_client() { class_<Client>("Client", init<const std::string&, const ClientConfiguration& >()) .def("create_producer", &Client_createProducer) .def("subscribe", &Client_subscribe) + .def("subscribe_topics", &Client_subscribe_topics) + .def("subscribe_pattern", &Client_subscribe_pattern) .def("create_reader", &Client_createReader) .def("close", &Client_close) .def("shutdown", &Client::shutdown) diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index 9626049..c1bfeef 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -139,6 +139,8 @@ void export_config() { .def("unacked_messages_timeout_ms", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs) .def("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs) .def("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs) + .def("pattern_auto_discovery_period", &ConsumerConfiguration::getPatternAutoDiscoveryPeriod) + .def("pattern_auto_discovery_period", &ConsumerConfiguration::setPatternAutoDiscoveryPeriod) .def("read_compacted", &ConsumerConfiguration::isReadCompacted) .def("read_compacted", &ConsumerConfiguration::setReadCompacted) .def("property", &ConsumerConfiguration::setProperty, return_self<>())