This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 669196c TopicPatterns are now supported by python functions (#2506) 669196c is described below commit 669196c27d3c2028a112d939774744dae8ab1b88 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Sep 6 01:01:03 2018 -0700 TopicPatterns are now supported by python functions (#2506) --- .../instance/src/main/python/python_instance.py | 26 ++++++++++++++-------- .../src/main/python/python_instance_main.py | 3 --- .../functions/utils/validation/ValidatorImpls.java | 9 -------- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index b43b882..ddd546e 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -172,7 +172,7 @@ class PythonInstance(object): self.consumers[topic] = self.pulsar_client.subscribe( str(topic), subscription_name, consumer_type=mode, - message_listener=partial(self.message_listener, topic, self.input_serdes[topic]), + message_listener=partial(self.message_listener, self.input_serdes[topic]), unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None ) @@ -183,12 +183,20 @@ class PythonInstance(object): serde_kclass = util.import_class(os.path.dirname(self.user_code), consumer_conf.serdeClassName) self.input_serdes[topic] = serde_kclass() Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) - self.consumers[topic] = self.pulsar_client.subscribe( - str(topic), subscription_name, - consumer_type=mode, - message_listener=partial(self.message_listener, topic, self.input_serdes[topic]), - unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None - ) + if consumer_conf.isRegexPattern: + self.consumers[topic] = self.pulsar_client.subscribe_pattern( + str(topic), subscription_name, + consumer_type=mode, + message_listener=partial(self.message_listener, self.input_serdes[topic]), + unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None + ) + else: + self.consumers[topic] = self.pulsar_client.subscribe( + str(topic), subscription_name, + consumer_type=mode, + message_listener=partial(self.message_listener, self.input_serdes[topic]), + unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None + ) function_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_details.className) if function_kclass is None: @@ -301,8 +309,8 @@ class PythonInstance(object): batching_max_publish_delay_ms=1, max_pending_messages=100000) - def message_listener(self, topic, serde, consumer, message): - item = InternalMessage(message, topic, serde, consumer) + def message_listener(self, serde, consumer, message): + item = InternalMessage(message, consumer.topic(), serde, consumer) self.queue.put(item, True) if self.atmost_once and self.auto_ack: consumer.acknowledge(message) diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 514d7fe..d9f1132 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -82,9 +82,6 @@ def main(): Log.info("Starting Python instance with %s" % str(args)) - if function_details.source.topicsPattern: - raise ValueError('topicsPattern is not supported by python client') - authentication = None use_tls = False tls_allow_insecure_connection = False diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java index cbeb970..f264d2a 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java @@ -506,15 +506,6 @@ public class ValidatorImpls { if (functionConfig.getWindowConfig() != null) { throw new IllegalArgumentException("There is currently no support windowing in python"); } - - if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) { - throw new IllegalArgumentException("Topic-patterns is not supported for python runtime"); - } - functionConfig.getInputSpecs().forEach((topic, conf) -> { - if (conf.isRegexPattern()) { - throw new IllegalArgumentException("Topic-patterns is not supported for python runtime"); - } - }); } private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {