This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 350037a Make default processing guarantee AT_LEAST_ONCE (#1340) 350037a is described below commit 350037a2d2cf5d1bacf28b119cc560feebad0de0 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Tue Mar 6 08:00:14 2018 -0800 Make default processing guarantee AT_LEAST_ONCE (#1340) * Make default processing guarantee AT_LEAST_ONCE * add a simple test to ensure default processing guarantee is ATLEAST_ONCE remove nullcheck since protobuf handles that --- .../functions/instance/JavaInstanceRunnable.java | 8 ++--- .../proto/src/main/proto/Function.proto | 4 +-- .../pulsar/functions/proto/FunctionConfigTest.java | 41 ++++++++++++++++++++++ .../pulsar/functions/runtime/ProcessRuntime.java | 6 +--- .../functions/runtime/ProcessRuntimeTest.java | 4 +-- 5 files changed, 48 insertions(+), 15 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 8f642e3..06904cf 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; @@ -57,7 +58,6 @@ import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.FunctionConfig; import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees; import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.instance.producers.MultiConsumersOneSinkTopicProducers; @@ -66,8 +66,6 @@ import org.apache.pulsar.functions.instance.producers.SimpleOneSinkTopicProducer import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.Reflections; - -import java.util.function.Function; import org.apache.pulsar.functions.utils.Utils; /** @@ -132,9 +130,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv PulsarClient pulsarClient, String stateStorageServiceUrl) { this.instanceConfig = instanceConfig; - this.processingGuarantees = instanceConfig.getFunctionConfig().getProcessingGuarantees() == null - ? FunctionConfig.ProcessingGuarantees.ATMOST_ONCE - : instanceConfig.getFunctionConfig().getProcessingGuarantees(); + this.processingGuarantees = instanceConfig.getFunctionConfig().getProcessingGuarantees(); this.fnCache = fnCache; this.queue = new LinkedBlockingDeque<>(instanceConfig.getMaxBufferedTuples()); this.jarFile = jarFile; diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 6027200..2beab12 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -25,8 +25,8 @@ option java_outer_classname = "Function"; message FunctionConfig { enum ProcessingGuarantees { - ATMOST_ONCE = 0; - ATLEAST_ONCE = 1; + ATLEAST_ONCE = 0; // [default value] + ATMOST_ONCE = 1; EFFECTIVELY_ONCE = 2; } enum SubscriptionType { diff --git a/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionConfigTest.java b/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionConfigTest.java new file mode 100644 index 0000000..b4cb287 --- /dev/null +++ b/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionConfigTest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.proto; + +import static org.testng.Assert.assertEquals; + +import org.apache.pulsar.functions.proto.Function.FunctionConfig; +import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees; +import org.testng.annotations.Test; + +/** + * Unit test for {@link FunctionConfig}. + */ +public class FunctionConfigTest { + + /** + * Make sure the default processing guarantee is always `ATLEAST_ONCE`. + */ + @Test + public void testDefaultProcessingGuarantee() { + FunctionConfig fc = FunctionConfig.newBuilder().build(); + assertEquals(ProcessingGuarantees.ATLEAST_ONCE, fc.getProcessingGuarantees()); + } + +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 03bcb3b..d1ec87a 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -153,11 +153,7 @@ class ProcessRuntime implements Runtime { args.add(instanceConfig.getFunctionConfig().getOutputSerdeClassName()); } args.add("--processing_guarantees"); - if (instanceConfig.getFunctionConfig().getProcessingGuarantees() != null) { - args.add(String.valueOf(instanceConfig.getFunctionConfig().getProcessingGuarantees())); - } else { - args.add("ATMOST_ONCE"); - } + args.add(String.valueOf(instanceConfig.getFunctionConfig().getProcessingGuarantees())); args.add("--pulsar_serviceurl"); args.add(pulsarServiceUrl); args.add("--max_buffered_tuples"); diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index 941babf..5b149af 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -115,7 +115,7 @@ public class ProcessRuntimeTest { + " --auto_ack false" + " --sink_topic " + config.getFunctionConfig().getOutput() + " --output_serde_classname " + config.getFunctionConfig().getOutputSerdeClassName() - + " --processing_guarantees ATMOST_ONCE" + + " --processing_guarantees ATLEAST_ONCE" + " --pulsar_serviceurl " + pulsarServiceUrl + " --max_buffered_tuples 1024 --port"; assertEquals(expectedArgs, String.join(" ", args)); @@ -141,7 +141,7 @@ public class ProcessRuntimeTest { + " --auto_ack false" + " --sink_topic " + config.getFunctionConfig().getOutput() + " --output_serde_classname " + config.getFunctionConfig().getOutputSerdeClassName() - + " --processing_guarantees ATMOST_ONCE" + + " --processing_guarantees ATLEAST_ONCE" + " --pulsar_serviceurl " + pulsarServiceUrl + " --max_buffered_tuples 1024 --port"; assertEquals(expectedArgs, String.join(" ", args)); -- To stop receiving notification emails like this one, please contact mme...@apache.org.