This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 82aa2b8 Add support for dead letter topics for java functions (#2606) 82aa2b8 is described below commit 82aa2b83359c31f71eae40bb8f068ce703f08b59 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Wed Sep 19 10:01:21 2018 -0700 Add support for dead letter topics for java functions (#2606) * Added ability to specify dead letter topic to functions * Fix bug * Added an example function that fails on a particular message consistently * Revert change --- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 25 ++++++++++++-- .../functions/instance/JavaInstanceRunnable.java | 5 +++ .../pulsar/functions/source/PulsarSource.java | 18 +++++----- .../functions/source/PulsarSourceConfig.java | 2 ++ .../api/examples/ConsistentlyFailingFunction.java | 38 ++++++++++++++++++++++ .../proto/src/main/proto/Function.proto | 6 ++++ .../pulsar/functions/utils/FunctionConfig.java | 2 ++ .../functions/utils/validation/ValidatorImpls.java | 12 +++++++ 8 files changed, 97 insertions(+), 11 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index e8c8740..5284b59 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -24,7 +24,7 @@ import static java.util.Objects.isNull; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.commons.lang.StringUtils.isBlank; import static org.apache.commons.lang.StringUtils.isNotBlank; -import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.commons.lang.StringUtils.isNotEmpty; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.fileExists; @@ -45,7 +45,6 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.Type; import java.net.MalformedURLException; @@ -79,12 +78,12 @@ import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function.ConsumerSpec; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Resources; +import org.apache.pulsar.functions.proto.Function.RetryDetails; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.Function.SubscriptionType; import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; -import org.apache.pulsar.functions.utils.ConsumerConfig; import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; @@ -321,6 +320,10 @@ public class CmdFunctions extends CmdBase { protected Long DEPRECATED_timeoutMs; @Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds") protected Long timeoutMs; + @Parameter(names = "--max-message-retries", description = "How many times should we try to process a message before giving up") + protected Integer maxMessageRetries = -1; + @Parameter(names = "--dead-letter-topic", description = "The topic where all messages which could not be processed successfully are sent") + protected String deadLetterTopic; protected FunctionConfig functionConfig; protected String userCodeFile; @@ -464,6 +467,13 @@ public class CmdFunctions extends CmdBase { functionConfig.setAutoAck(autoAck); + if (null != maxMessageRetries) { + functionConfig.setMaxMessageRetries(maxMessageRetries); + } + if (null != deadLetterTopic) { + functionConfig.setDeadLetterTopic(deadLetterTopic); + } + if (null != jarFile) { functionConfig.setJar(jarFile); } @@ -717,6 +727,15 @@ public class CmdFunctions extends CmdBase { Utils.convertProcessingGuarantee(functionConfig.getProcessingGuarantees())); } + if (functionConfig.getMaxMessageRetries() >= 0) { + RetryDetails.Builder retryBuilder = RetryDetails.newBuilder(); + retryBuilder.setMaxMessageRetries(functionConfig.getMaxMessageRetries()); + if (isNotEmpty(functionConfig.getDeadLetterTopic())) { + retryBuilder.setDeadLetterTopic(functionConfig.getDeadLetterTopic()); + } + functionDetailsBuilder.setRetryDetails(retryBuilder); + } + Map<String, Object> configs = new HashMap<>(); configs.putAll(functionConfig.getUserConfig()); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index b3f86ea..1e07516 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -547,6 +547,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { if (sourceSpec.getTimeoutMs() > 0 ) { pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs()); } + + if (this.instanceConfig.getFunctionDetails().getRetryDetails() != null) { + pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries()); + pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic()); + } object = new PulsarSource(this.client, pulsarSourceConfig, FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); } else { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 6eed8e0..afac782 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -31,14 +31,7 @@ import lombok.Builder; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageListener; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.functions.api.Record; @@ -97,6 +90,15 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); } + if (pulsarSourceConfig.getMaxMessageRetries() >= 0) { + DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterPolicyBuilder = DeadLetterPolicy.builder(); + deadLetterPolicyBuilder.maxRedeliverCount(pulsarSourceConfig.getMaxMessageRetries()); + if (pulsarSourceConfig.getDeadLetterTopic() != null && !pulsarSourceConfig.getDeadLetterTopic().isEmpty()) { + deadLetterPolicyBuilder.deadLetterTopic(pulsarSourceConfig.getDeadLetterTopic()); + } + cb.deadLetterPolicy(deadLetterPolicyBuilder.build()); + } + return cb.subscribeAsync(); }).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java index f1cb09b..4e2afa7 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java @@ -37,6 +37,8 @@ public class PulsarSourceConfig { private FunctionConfig.ProcessingGuarantees processingGuarantees; SubscriptionType subscriptionType; private String subscriptionName; + private int maxMessageRetries; + private String deadLetterTopic; private Map<String, ConsumerConfig> topicSchema = new TreeMap<>(); diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConsistentlyFailingFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConsistentlyFailingFunction.java new file mode 100644 index 0000000..792a574 --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConsistentlyFailingFunction.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; + + +/** + * This Function simulates a pulsar function encountering failing on a particular message. + */ +public class ConsistentlyFailingFunction implements Function<String, String> { + @Override + public String process(String input, Context context) { + if (input.equals("FAIL")) { + throw new RuntimeException("Failed"); + } else { + return "SUCCESS"; + } + } +} + diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index a76cf8d..482d901 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -40,6 +40,11 @@ message Resources { int64 disk = 3; } +message RetryDetails { + int32 maxMessageRetries = 1; + string deadLetterTopic = 2; +} + message FunctionDetails { enum Runtime { JAVA = 0; @@ -59,6 +64,7 @@ message FunctionDetails { SinkSpec sink = 12; Resources resources = 13; string packageUrl = 14; //present only if function submitted with package-url + RetryDetails retryDetails = 15; } message ConsumerSpec { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index 1335f8c..dc36812 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -107,6 +107,8 @@ public class FunctionConfig { private Map<String, Object> userConfig; private Runtime runtime; private boolean autoAck; + private int maxMessageRetries; + private String deadLetterTopic; private String subName; @isPositiveNumber private int parallelism; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java index f60f3c0..e8acc28 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java @@ -514,6 +514,10 @@ public class ValidatorImpls { if (functionConfig.getWindowConfig() != null) { throw new IllegalArgumentException("There is currently no support windowing in python"); } + + if (functionConfig.getMaxMessageRetries() >= 0) { + throw new IllegalArgumentException("Message retries not yet supported in python"); + } } private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException { @@ -549,6 +553,14 @@ public class ValidatorImpls { throw new IllegalArgumentException("Message timeout can only be specified with processing guarantee is " + FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name()); } + + if (functionConfig.getMaxMessageRetries() >= 0 + && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { + throw new IllegalArgumentException("MaxMessageRetries and Effectively once don't gel well"); + } + if (functionConfig.getMaxMessageRetries() < 0 && !StringUtils.isEmpty(functionConfig.getDeadLetterTopic())) { + throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity"); + } } private static Collection<String> collectAllInputTopics(FunctionConfig functionConfig) {