This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 350037a  Make default processing guarantee AT_LEAST_ONCE (#1340)
350037a is described below

commit 350037a2d2cf5d1bacf28b119cc560feebad0de0
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Tue Mar 6 08:00:14 2018 -0800

    Make default processing guarantee AT_LEAST_ONCE (#1340)
    
    * Make default processing guarantee AT_LEAST_ONCE
    
    * add a simple test to ensure default processing guarantee is ATLEAST_ONCE
    remove nullcheck since protobuf handles that
---
 .../functions/instance/JavaInstanceRunnable.java   |  8 ++---
 .../proto/src/main/proto/Function.proto            |  4 +--
 .../pulsar/functions/proto/FunctionConfigTest.java | 41 ++++++++++++++++++++++
 .../pulsar/functions/runtime/ProcessRuntime.java   |  6 +---
 .../functions/runtime/ProcessRuntimeTest.java      |  4 +--
 5 files changed, 48 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 8f642e3..06904cf 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
@@ -57,7 +58,6 @@ import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.proto.Function.FunctionConfig;
 import 
org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.api.SerDe;
 import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneSinkTopicProducers;
@@ -66,8 +66,6 @@ import 
org.apache.pulsar.functions.instance.producers.SimpleOneSinkTopicProducer
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.Reflections;
-
-import java.util.function.Function;
 import org.apache.pulsar.functions.utils.Utils;
 
 /**
@@ -132,9 +130,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable, ConsumerEv
                                 PulsarClient pulsarClient,
                                 String stateStorageServiceUrl) {
         this.instanceConfig = instanceConfig;
-        this.processingGuarantees = 
instanceConfig.getFunctionConfig().getProcessingGuarantees() == null
-                ? FunctionConfig.ProcessingGuarantees.ATMOST_ONCE
-                : instanceConfig.getFunctionConfig().getProcessingGuarantees();
+        this.processingGuarantees = 
instanceConfig.getFunctionConfig().getProcessingGuarantees();
         this.fnCache = fnCache;
         this.queue = new 
LinkedBlockingDeque<>(instanceConfig.getMaxBufferedTuples());
         this.jarFile = jarFile;
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 6027200..2beab12 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -25,8 +25,8 @@ option java_outer_classname = "Function";
 
 message FunctionConfig {
     enum ProcessingGuarantees {
-        ATMOST_ONCE = 0;
-        ATLEAST_ONCE = 1;
+        ATLEAST_ONCE = 0; // [default value]
+        ATMOST_ONCE = 1;
         EFFECTIVELY_ONCE = 2;
     }
     enum SubscriptionType {
diff --git 
a/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionConfigTest.java
 
b/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionConfigTest.java
new file mode 100644
index 0000000..b4cb287
--- /dev/null
+++ 
b/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionConfigTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.proto;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.functions.proto.Function.FunctionConfig;
+import 
org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for {@link FunctionConfig}.
+ */
+public class FunctionConfigTest {
+
+    /**
+     * Make sure the default processing guarantee is always `ATLEAST_ONCE`.
+     */
+    @Test
+    public void testDefaultProcessingGuarantee() {
+        FunctionConfig fc = FunctionConfig.newBuilder().build();
+        assertEquals(ProcessingGuarantees.ATLEAST_ONCE, 
fc.getProcessingGuarantees());
+    }
+
+}
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 03bcb3b..d1ec87a 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -153,11 +153,7 @@ class ProcessRuntime implements Runtime {
             
args.add(instanceConfig.getFunctionConfig().getOutputSerdeClassName());
         }
         args.add("--processing_guarantees");
-        if (instanceConfig.getFunctionConfig().getProcessingGuarantees() != 
null) {
-            
args.add(String.valueOf(instanceConfig.getFunctionConfig().getProcessingGuarantees()));
-        } else {
-            args.add("ATMOST_ONCE");
-        }
+        
args.add(String.valueOf(instanceConfig.getFunctionConfig().getProcessingGuarantees()));
         args.add("--pulsar_serviceurl");
         args.add(pulsarServiceUrl);
         args.add("--max_buffered_tuples");
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 941babf..5b149af 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -115,7 +115,7 @@ public class ProcessRuntimeTest {
                 + " --auto_ack false"
                 + " --sink_topic " + config.getFunctionConfig().getOutput()
                 + " --output_serde_classname " + 
config.getFunctionConfig().getOutputSerdeClassName()
-                + " --processing_guarantees ATMOST_ONCE"
+                + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port";
         assertEquals(expectedArgs, String.join(" ", args));
@@ -141,7 +141,7 @@ public class ProcessRuntimeTest {
                 + " --auto_ack false"
                 + " --sink_topic " + config.getFunctionConfig().getOutput()
                 + " --output_serde_classname " + 
config.getFunctionConfig().getOutputSerdeClassName()
-                + " --processing_guarantees ATMOST_ONCE"
+                + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port";
         assertEquals(expectedArgs, String.join(" ", args));

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to