This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 82aa2b8  Add support for dead letter topics for java functions (#2606)
82aa2b8 is described below

commit 82aa2b83359c31f71eae40bb8f068ce703f08b59
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Wed Sep 19 10:01:21 2018 -0700

    Add support for dead letter topics for java functions (#2606)
    
    * Added ability to specify dead letter topic to functions
    
    * Fix bug
    
    * Added an example function that fails on a particular message consistently
    
    * Revert change
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 25 ++++++++++++--
 .../functions/instance/JavaInstanceRunnable.java   |  5 +++
 .../pulsar/functions/source/PulsarSource.java      | 18 +++++-----
 .../functions/source/PulsarSourceConfig.java       |  2 ++
 .../api/examples/ConsistentlyFailingFunction.java  | 38 ++++++++++++++++++++++
 .../proto/src/main/proto/Function.proto            |  6 ++++
 .../pulsar/functions/utils/FunctionConfig.java     |  2 ++
 .../functions/utils/validation/ValidatorImpls.java | 12 +++++++
 8 files changed, 97 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index e8c8740..5284b59 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -24,7 +24,7 @@ import static java.util.Objects.isNull;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.commons.lang.StringUtils.isNotBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static org.apache.pulsar.functions.utils.Utils.fileExists;
@@ -45,7 +45,6 @@ import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
@@ -79,12 +78,12 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Resources;
+import org.apache.pulsar.functions.proto.Function.RetryDetails;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.utils.ConsumerConfig;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
@@ -321,6 +320,10 @@ public class CmdFunctions extends CmdBase {
         protected Long DEPRECATED_timeoutMs;
         @Parameter(names = "--timeout-ms", description = "The message timeout 
in milliseconds")
         protected Long timeoutMs;
+        @Parameter(names = "--max-message-retries", description = "How many 
times should we try to process a message before giving up")
+        protected Integer maxMessageRetries = -1;
+        @Parameter(names = "--dead-letter-topic", description = "The topic 
where all messages which could not be processed successfully are sent")
+        protected String deadLetterTopic;
         protected FunctionConfig functionConfig;
         protected String userCodeFile;
 
@@ -464,6 +467,13 @@ public class CmdFunctions extends CmdBase {
 
             functionConfig.setAutoAck(autoAck);
 
+            if (null != maxMessageRetries) {
+                functionConfig.setMaxMessageRetries(maxMessageRetries);
+            }
+            if (null != deadLetterTopic) {
+                functionConfig.setDeadLetterTopic(deadLetterTopic);
+            }
+
             if (null != jarFile) {
                 functionConfig.setJar(jarFile);
             }
@@ -717,6 +727,15 @@ public class CmdFunctions extends CmdBase {
                         
Utils.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
             }
 
+            if (functionConfig.getMaxMessageRetries() >= 0) {
+                RetryDetails.Builder retryBuilder = RetryDetails.newBuilder();
+                
retryBuilder.setMaxMessageRetries(functionConfig.getMaxMessageRetries());
+                if (isNotEmpty(functionConfig.getDeadLetterTopic())) {
+                    
retryBuilder.setDeadLetterTopic(functionConfig.getDeadLetterTopic());
+                }
+                functionDetailsBuilder.setRetryDetails(retryBuilder);
+            }
+
             Map<String, Object> configs = new HashMap<>();
             configs.putAll(functionConfig.getUserConfig());
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b3f86ea..1e07516 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -547,6 +547,11 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
             if (sourceSpec.getTimeoutMs() > 0 ) {
                 pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
             }
+
+            if (this.instanceConfig.getFunctionDetails().getRetryDetails() != 
null) {
+                
pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
+                
pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
+            }
             object = new PulsarSource(this.client, pulsarSourceConfig,
                     
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
         } else {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 6eed8e0..afac782 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -31,14 +31,7 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.functions.api.Record;
@@ -97,6 +90,15 @@ public class PulsarSource<T> extends PushSource<T> 
implements MessageListener<T>
                 cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
             }
 
+            if (pulsarSourceConfig.getMaxMessageRetries() >= 0) {
+                DeadLetterPolicy.DeadLetterPolicyBuilder 
deadLetterPolicyBuilder = DeadLetterPolicy.builder();
+                
deadLetterPolicyBuilder.maxRedeliverCount(pulsarSourceConfig.getMaxMessageRetries());
+                if (pulsarSourceConfig.getDeadLetterTopic() != null && 
!pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
+                    
deadLetterPolicyBuilder.deadLetterTopic(pulsarSourceConfig.getDeadLetterTopic());
+                }
+                cb.deadLetterPolicy(deadLetterPolicyBuilder.build());
+            }
+
             return cb.subscribeAsync();
         
}).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index f1cb09b..4e2afa7 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -37,6 +37,8 @@ public class PulsarSourceConfig {
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     SubscriptionType subscriptionType;
     private String subscriptionName;
+    private int maxMessageRetries;
+    private String deadLetterTopic;
 
     private Map<String, ConsumerConfig> topicSchema = new TreeMap<>();
 
diff --git 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConsistentlyFailingFunction.java
 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConsistentlyFailingFunction.java
new file mode 100644
index 0000000..792a574
--- /dev/null
+++ 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConsistentlyFailingFunction.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+
+/**
+ * This Function simulates a pulsar function encountering failing on a 
particular message.
+ */
+public class ConsistentlyFailingFunction implements Function<String, String> {
+    @Override
+    public String process(String input, Context context) {
+        if (input.equals("FAIL")) {
+            throw new RuntimeException("Failed");
+        } else {
+            return "SUCCESS";
+        }
+    }
+}
+
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index a76cf8d..482d901 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -40,6 +40,11 @@ message Resources {
     int64 disk = 3;
 }
 
+message RetryDetails {
+    int32 maxMessageRetries = 1;
+    string deadLetterTopic = 2;
+}
+
 message FunctionDetails {
     enum Runtime {
         JAVA = 0;
@@ -59,6 +64,7 @@ message FunctionDetails {
     SinkSpec sink = 12;
     Resources resources = 13;
     string packageUrl = 14; //present only if function submitted with 
package-url
+    RetryDetails retryDetails = 15;
 }
 
 message ConsumerSpec {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 1335f8c..dc36812 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -107,6 +107,8 @@ public class FunctionConfig {
     private Map<String, Object> userConfig;
     private Runtime runtime;
     private boolean autoAck;
+    private int maxMessageRetries;
+    private String deadLetterTopic;
     private String subName;
     @isPositiveNumber
     private int parallelism;
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index f60f3c0..e8acc28 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -514,6 +514,10 @@ public class ValidatorImpls {
             if (functionConfig.getWindowConfig() != null) {
                 throw new IllegalArgumentException("There is currently no 
support windowing in python");
             }
+
+            if (functionConfig.getMaxMessageRetries() >= 0) {
+                throw new IllegalArgumentException("Message retries not yet 
supported in python");
+            }
         }
 
         private static void verifyNoTopicClash(Collection<String> inputTopics, 
String outputTopic) throws IllegalArgumentException {
@@ -549,6 +553,14 @@ public class ValidatorImpls {
                 throw new IllegalArgumentException("Message timeout can only 
be specified with processing guarantee is "
                         + 
FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name());
             }
+
+            if (functionConfig.getMaxMessageRetries() >= 0
+                    && functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                throw new IllegalArgumentException("MaxMessageRetries and 
Effectively once don't gel well");
+            }
+            if (functionConfig.getMaxMessageRetries() < 0 && 
!StringUtils.isEmpty(functionConfig.getDeadLetterTopic())) {
+                throw new IllegalArgumentException("Dead Letter Topic 
specified, however max retries is set to infinity");
+            }
         }
 
         private static Collection<String> collectAllInputTopics(FunctionConfig 
functionConfig) {

Reply via email to