This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fc4b35  Honor User specified Subscription Types while running 
functions (#1560)
1fc4b35 is described below

commit 1fc4b35b5988936ef57f6f7e109e9afd23c138a8
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu Apr 12 11:50:31 2018 -0700

    Honor User specified Subscription Types while running functions (#1560)
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java  | 7 +++++++
 .../pulsar/functions/instance/processors/MessageProcessor.java   | 4 +++-
 pulsar-functions/instance/src/main/python/python_instance.py     | 9 ++++++---
 .../instance/src/main/python/python_instance_main.py             | 2 ++
 .../org/apache/pulsar/functions/runtime/JavaInstanceMain.java    | 4 ++++
 .../java/org/apache/pulsar/functions/runtime/ProcessRuntime.java | 2 ++
 .../org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java  | 6 ++++--
 .../java/org/apache/pulsar/functions/utils/FunctionConfig.java   | 3 ++-
 8 files changed, 30 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 71ba928..34abea1 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -293,6 +293,13 @@ public class CmdFunctions extends CmdBase {
                 functionConfig.setParallelism(num);
             }
 
+            if (functionConfig.getSubscriptionType() != null
+                    && functionConfig.getSubscriptionType() != 
FunctionConfig.SubscriptionType.FAILOVER
+                    && functionConfig.getProcessingGuarantees() != null
+                    && functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                throw new IllegalArgumentException("Effectively Once can only 
be acheived with Failover subscription");
+            }
+
             functionConfig.setAutoAck(true);
             inferMissingArguments(functionConfig);
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
index 1167d73..5f0e242 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -42,8 +42,10 @@ public interface MessageProcessor extends AutoCloseable {
         FunctionDetails.SubscriptionType fnSubType = 
functionDetails.getSubscriptionType();
         ProcessingGuarantees processingGuarantees = 
functionDetails.getProcessingGuarantees();
         SubscriptionType subType;
-        if (null == fnSubType || FunctionDetails.SubscriptionType.SHARED == 
fnSubType) {
+        if (FunctionDetails.SubscriptionType.SHARED == fnSubType) {
             subType = SubscriptionType.Shared;
+        } else if (FunctionDetails.SubscriptionType.EXCLUSIVE == fnSubType) {
+            subType = SubscriptionType.Exclusive;
         } else {
             subType = SubscriptionType.Failover;
         }
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index c0165b6..53ca3f0 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -119,9 +119,12 @@ class PythonInstance(object):
 
   def run(self):
     # Setup consumers and input deserializers
-    mode = pulsar._pulsar.ConsumerType.Exclusive
-    if self.atmost_once:
-      mode = pulsar._pulsar.ConsumerType.Shared
+    mode = pulsar._pulsar.ConsumerType.Shared
+    if self.instance_config.function_details.subscriptionType == 
Function_pb2.FunctionDetails.SubscriptionType.Value('EXCLUSIVE'):
+      mode = pulsar._pulsar.ConsumerType.Exclusive
+    elif self.instance_config.function_details.subscriptionType == 
Function_pb2.FunctionDetails.SubscriptionType.Value('FAILOVER'):
+      mode = pulsar._pulsar.ConsumerType.Failover
+
     subscription_name = str(self.instance_config.function_details.tenant) + 
"/" + \
                         str(self.instance_config.function_details.namespace) + 
"/" + \
                         str(self.instance_config.function_details.name)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 0e9d857..42b1af3 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -67,6 +67,7 @@ def main():
   parser.add_argument('--function_id', required=True, help='Function Id')
   parser.add_argument('--function_version', required=True, help='Function 
Version')
   parser.add_argument('--processing_guarantees', required=True, 
help='Processing Guarantees')
+  parser.add_argument('--subscription_type', required=True, help='Subscription 
Type')
   parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar 
Service Url')
   parser.add_argument('--port', required=True, help='Instance Port', type=int)
   parser.add_argument('--max_buffered_tuples', required=True, help='Maximum 
number of Buffered tuples')
@@ -109,6 +110,7 @@ def main():
   if args.output_serde_classname != None and len(args.output_serde_classname) 
!= 0:
     function_details.outputSerdeClassName = args.output_serde_classname
   function_details.processingGuarantees = 
Function_pb2.FunctionDetails.ProcessingGuarantees.Value(args.processing_guarantees)
+  function_details.subscriptionType = 
Function_pb2.FunctionDetails.SubscriptionType.Values(args.subscription_type)
   if args.auto_ack == "true":
     function_details.autoAck = True
   else:
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index c549eab..88f1ac7 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -103,6 +103,9 @@ public class JavaInstanceMain {
     @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n")
     protected String autoAck = "true";
 
+    @Parameter(names = "--subscription_type", description = "What subscription 
type to use")
+    protected FunctionDetails.SubscriptionType subscriptionType;
+
     private Server server;
 
     public JavaInstanceMain() { }
@@ -150,6 +153,7 @@ public class JavaInstanceMain {
         } else {
             functionDetailsBuilder.setAutoAck(false);
         }
+        functionDetailsBuilder.setSubscriptionType(subscriptionType);
         if (userConfig != null && !userConfig.isEmpty()) {
             Type type = new TypeToken<Map<String, String>>(){}.getType();
             Map<String, String> userConfigMap = new 
Gson().fromJson(userConfig, type);
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 0e8b300..def06cf 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -106,6 +106,8 @@ class ProcessRuntime implements Runtime {
         args.add(instanceConfig.getFunctionDetails().getName());
         args.add("--function_classname");
         args.add(instanceConfig.getFunctionDetails().getClassName());
+        args.add("--subscription_type");
+        
args.add(instanceConfig.getFunctionDetails().getSubscriptionType().toString());
         if (instanceConfig.getFunctionDetails().getLogTopic() != null &&
             !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
             args.add("--log_topic");
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 48602e3..7042c26 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -101,7 +101,7 @@ public class ProcessRuntimeTest {
 
         ProcessRuntime container = factory.createContainer(config, 
userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 41);
+        assertEquals(args.size(), 43);
         args.remove(args.size() - 1);
         String expectedArgs = "java -cp " + javaInstanceJarFile + " 
-Dlog4j.configurationFile=java_instance_log4j2.yml "
                 + "-Dpulsar.log.dir=" + logDirectory + "/functions" + " 
-Dpulsar.log.file=" + config.getFunctionDetails().getName()
@@ -112,6 +112,7 @@ public class ProcessRuntimeTest {
                 + " --namespace " + config.getFunctionDetails().getNamespace()
                 + " --name " + config.getFunctionDetails().getName()
                 + " --function_classname " + 
config.getFunctionDetails().getClassName()
+                + " --subscription_type " + 
config.getFunctionDetails().getSubscriptionType()
                 + " --log_topic " + config.getFunctionDetails().getLogTopic()
                 + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + 
"-input2"
                 + " --auto_ack false"
@@ -129,7 +130,7 @@ public class ProcessRuntimeTest {
 
         ProcessRuntime container = factory.createContainer(config, 
userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 40);
+        assertEquals(args.size(), 42);
         args.remove(args.size() - 1);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
@@ -139,6 +140,7 @@ public class ProcessRuntimeTest {
                 + " --namespace " + config.getFunctionDetails().getNamespace()
                 + " --name " + config.getFunctionDetails().getName()
                 + " --function_classname " + 
config.getFunctionDetails().getClassName()
+                + " --subscription_type " + 
config.getFunctionDetails().getSubscriptionType()
                 + " --log_topic " + config.getFunctionDetails().getLogTopic()
                 + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + 
"-input2"
                 + " --auto_ack false"
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index fe293ea..b855104 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -44,7 +44,8 @@ public class FunctionConfig {
 
     public enum SubscriptionType {
         SHARED,
-        EXCLUSIVE
+        EXCLUSIVE,
+        FAILOVER
     }
 
     public enum Runtime {

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to