This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 1fc4b35 Honor User specified Subscription Types while running functions (#1560) 1fc4b35 is described below commit 1fc4b35b5988936ef57f6f7e109e9afd23c138a8 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Apr 12 11:50:31 2018 -0700 Honor User specified Subscription Types while running functions (#1560) --- .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java | 7 +++++++ .../pulsar/functions/instance/processors/MessageProcessor.java | 4 +++- pulsar-functions/instance/src/main/python/python_instance.py | 9 ++++++--- .../instance/src/main/python/python_instance_main.py | 2 ++ .../org/apache/pulsar/functions/runtime/JavaInstanceMain.java | 4 ++++ .../java/org/apache/pulsar/functions/runtime/ProcessRuntime.java | 2 ++ .../org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java | 6 ++++-- .../java/org/apache/pulsar/functions/utils/FunctionConfig.java | 3 ++- 8 files changed, 30 insertions(+), 7 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 71ba928..34abea1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -293,6 +293,13 @@ public class CmdFunctions extends CmdBase { functionConfig.setParallelism(num); } + if (functionConfig.getSubscriptionType() != null + && functionConfig.getSubscriptionType() != FunctionConfig.SubscriptionType.FAILOVER + && functionConfig.getProcessingGuarantees() != null + && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { + throw new IllegalArgumentException("Effectively Once can only be acheived with Failover subscription"); + } + functionConfig.setAutoAck(true); inferMissingArguments(functionConfig); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java index 1167d73..5f0e242 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java @@ -42,8 +42,10 @@ public interface MessageProcessor extends AutoCloseable { FunctionDetails.SubscriptionType fnSubType = functionDetails.getSubscriptionType(); ProcessingGuarantees processingGuarantees = functionDetails.getProcessingGuarantees(); SubscriptionType subType; - if (null == fnSubType || FunctionDetails.SubscriptionType.SHARED == fnSubType) { + if (FunctionDetails.SubscriptionType.SHARED == fnSubType) { subType = SubscriptionType.Shared; + } else if (FunctionDetails.SubscriptionType.EXCLUSIVE == fnSubType) { + subType = SubscriptionType.Exclusive; } else { subType = SubscriptionType.Failover; } diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index c0165b6..53ca3f0 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -119,9 +119,12 @@ class PythonInstance(object): def run(self): # Setup consumers and input deserializers - mode = pulsar._pulsar.ConsumerType.Exclusive - if self.atmost_once: - mode = pulsar._pulsar.ConsumerType.Shared + mode = pulsar._pulsar.ConsumerType.Shared + if self.instance_config.function_details.subscriptionType == Function_pb2.FunctionDetails.SubscriptionType.Value('EXCLUSIVE'): + mode = pulsar._pulsar.ConsumerType.Exclusive + elif self.instance_config.function_details.subscriptionType == Function_pb2.FunctionDetails.SubscriptionType.Value('FAILOVER'): + mode = pulsar._pulsar.ConsumerType.Failover + subscription_name = str(self.instance_config.function_details.tenant) + "/" + \ str(self.instance_config.function_details.namespace) + "/" + \ str(self.instance_config.function_details.name) diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 0e9d857..42b1af3 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -67,6 +67,7 @@ def main(): parser.add_argument('--function_id', required=True, help='Function Id') parser.add_argument('--function_version', required=True, help='Function Version') parser.add_argument('--processing_guarantees', required=True, help='Processing Guarantees') + parser.add_argument('--subscription_type', required=True, help='Subscription Type') parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar Service Url') parser.add_argument('--port', required=True, help='Instance Port', type=int) parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples') @@ -109,6 +110,7 @@ def main(): if args.output_serde_classname != None and len(args.output_serde_classname) != 0: function_details.outputSerdeClassName = args.output_serde_classname function_details.processingGuarantees = Function_pb2.FunctionDetails.ProcessingGuarantees.Value(args.processing_guarantees) + function_details.subscriptionType = Function_pb2.FunctionDetails.SubscriptionType.Values(args.subscription_type) if args.auto_ack == "true": function_details.autoAck = True else: diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index c549eab..88f1ac7 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -103,6 +103,9 @@ public class JavaInstanceMain { @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n") protected String autoAck = "true"; + @Parameter(names = "--subscription_type", description = "What subscription type to use") + protected FunctionDetails.SubscriptionType subscriptionType; + private Server server; public JavaInstanceMain() { } @@ -150,6 +153,7 @@ public class JavaInstanceMain { } else { functionDetailsBuilder.setAutoAck(false); } + functionDetailsBuilder.setSubscriptionType(subscriptionType); if (userConfig != null && !userConfig.isEmpty()) { Type type = new TypeToken<Map<String, String>>(){}.getType(); Map<String, String> userConfigMap = new Gson().fromJson(userConfig, type); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 0e8b300..def06cf 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -106,6 +106,8 @@ class ProcessRuntime implements Runtime { args.add(instanceConfig.getFunctionDetails().getName()); args.add("--function_classname"); args.add(instanceConfig.getFunctionDetails().getClassName()); + args.add("--subscription_type"); + args.add(instanceConfig.getFunctionDetails().getSubscriptionType().toString()); if (instanceConfig.getFunctionDetails().getLogTopic() != null && !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) { args.add("--log_topic"); diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index 48602e3..7042c26 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -101,7 +101,7 @@ public class ProcessRuntimeTest { ProcessRuntime container = factory.createContainer(config, userJarFile); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 41); + assertEquals(args.size(), 43); args.remove(args.size() - 1); String expectedArgs = "java -cp " + javaInstanceJarFile + " -Dlog4j.configurationFile=java_instance_log4j2.yml " + "-Dpulsar.log.dir=" + logDirectory + "/functions" + " -Dpulsar.log.file=" + config.getFunctionDetails().getName() @@ -112,6 +112,7 @@ public class ProcessRuntimeTest { + " --namespace " + config.getFunctionDetails().getNamespace() + " --name " + config.getFunctionDetails().getName() + " --function_classname " + config.getFunctionDetails().getClassName() + + " --subscription_type " + config.getFunctionDetails().getSubscriptionType() + " --log_topic " + config.getFunctionDetails().getLogTopic() + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + "-input2" + " --auto_ack false" @@ -129,7 +130,7 @@ public class ProcessRuntimeTest { ProcessRuntime container = factory.createContainer(config, userJarFile); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 40); + assertEquals(args.size(), 42); args.remove(args.size() - 1); String expectedArgs = "python " + pythonInstanceFile + " --py " + userJarFile + " --logging_directory " @@ -139,6 +140,7 @@ public class ProcessRuntimeTest { + " --namespace " + config.getFunctionDetails().getNamespace() + " --name " + config.getFunctionDetails().getName() + " --function_classname " + config.getFunctionDetails().getClassName() + + " --subscription_type " + config.getFunctionDetails().getSubscriptionType() + " --log_topic " + config.getFunctionDetails().getLogTopic() + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + "-input2" + " --auto_ack false" diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index fe293ea..b855104 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -44,7 +44,8 @@ public class FunctionConfig { public enum SubscriptionType { SHARED, - EXCLUSIVE + EXCLUSIVE, + FAILOVER } public enum Runtime { -- To stop receiving notification emails like this one, please contact si...@apache.org.