This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7484664  Pulsar Functions CLI fixes (#1655)
7484664 is described below

commit 74846643303385cce847120b86c8342e50950492
Author: Luc Perkins <lucperk...@gmail.com>
AuthorDate: Thu Apr 26 16:45:36 2018 -0700

    Pulsar Functions CLI fixes (#1655)
    
    * add new argument checker function
    
    * fix misspelling in variable name
    
    * add check for class name to doJavaSubmitChecks
    
    * add default service URL as constant
    
    * fix input topic checking logic
    
    * remove 'got exception' line from Exception printing
    
    * add check for Python class name
    
    * add basic topic name validation
    
    * fix misspelling in getClassname call
    
    * use formatted string for inferred output topic name
    
    * ensure topics aren't used as both input and output
---
 .../java/org/apache/pulsar/admin/cli/CmdBase.java  |  1 -
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 92 ++++++++++++++++------
 2 files changed, 69 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index bf8ce6c..682413c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -77,7 +77,6 @@ public abstract class CmdBase {
                 System.err.println("Reason: " + e.getMessage());
                 return false;
             } catch (Exception e) {
-                System.err.println("Got exception: " + e.getMessage());
                 e.printStackTrace();
                 return false;
             }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index c1446ed..1369a69 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.admin.cli;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.isNull;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
 import java.io.File;
@@ -27,10 +28,12 @@ import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.IntStream;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -40,6 +43,7 @@ import org.apache.bookkeeper.api.kv.result.KeyValue;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -76,6 +80,7 @@ import net.jodah.typetools.TypeResolver;
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions 
(lightweight, Lambda-style compute processes that work with Pulsar)")
 public class CmdFunctions extends CmdBase {
+    private static final String DEFAULT_SERVICE_URL = 
"pulsar://localhost:6650";
 
     private final LocalRunner localRunner;
     private final CreateFunction creater;
@@ -239,14 +244,20 @@ public class CmdFunctions extends CmdBase {
             }
 
             if (null != inputs) {
-                
Arrays.asList(inputs.split(",")).forEach(functionConfig.getInputs()::add);
+                List<String> inputTopics = Arrays.asList(inputs.split(","));
+                inputTopics.forEach(this::validateTopicName);
+                functionConfig.setInputs(inputTopics);
             }
             if (null != customSerdeInputString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
                 Map<String, String> customSerdeInputMap = new 
Gson().fromJson(customSerdeInputString, type);
+                customSerdeInputMap.forEach((topic, serde) -> {
+                    validateTopicName(topic);
+                });
                 functionConfig.setCustomSerdeInputs(customSerdeInputMap);
             }
             if (null != output) {
+                validateTopicName(output);
                 functionConfig.setOutput(output);
             }
             if (null != logTopic) {
@@ -281,10 +292,13 @@ public class CmdFunctions extends CmdBase {
                 throw new RuntimeException("Either a Java jar or a Python file 
needs to be specified for the function");
             }
 
-            if (functionConfig.getInputs().size() == 0 && 
functionConfig.getCustomSerdeInputs().size() == 0) {
+            if (functionConfig.getInputs().isEmpty() && 
functionConfig.getCustomSerdeInputs().isEmpty()) {
                 throw new RuntimeException("No input topic(s) specified for 
the function");
             }
 
+            // Ensure that topics aren't being used as both input and output
+            verifyNoTopicClash(functionConfig.getInputs(), 
functionConfig.getOutput());;
+
             if (parallelism == null) {
                 if (functionConfig.getParallelism() == 0) {
                     functionConfig.setParallelism(1);
@@ -301,7 +315,7 @@ public class CmdFunctions extends CmdBase {
                     && functionConfig.getSubscriptionType() != 
FunctionConfig.SubscriptionType.FAILOVER
                     && functionConfig.getProcessingGuarantees() != null
                     && functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                throw new IllegalArgumentException("Effectively Once can only 
be acheived with Failover subscription");
+                throw new IllegalArgumentException("Effectively-once 
processing semantics can only be achieved using a Failover subscription type");
             }
 
             functionConfig.setAutoAck(true);
@@ -309,6 +323,10 @@ public class CmdFunctions extends CmdBase {
         }
 
         private void doJavaSubmitChecks(FunctionConfig functionConfig) {
+            if (isNull(className)) {
+                throw new IllegalArgumentException("You supplied a jar file 
but no main class");
+            }
+
             File file = new File(jarFile);
             // check if the function class exists in Jar and it implements 
Function class
             if (!Reflections.classExistsInJar(file, 
functionConfig.getClassName())) {
@@ -431,22 +449,32 @@ public class CmdFunctions extends CmdBase {
         }
 
         private void doPythonSubmitChecks(FunctionConfig functionConfig) {
+            if (functionConfig.getClassName() == null) {
+                throw new IllegalArgumentException("You specified a Python 
file but no main class name");
+            }
+
             if (functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                 throw new RuntimeException("Effectively-once processing 
guarantees not yet supported in Python");
             }
         }
 
+        private void validateTopicName(String topic) {
+            if (!TopicName.isValid(topic)) {
+                throw new IllegalArgumentException(String.format("The topic 
name %s is invalid", topic));
+            }
+        }
+
         private void inferMissingArguments(FunctionConfig functionConfig) {
-            if (functionConfig.getName() == null || 
functionConfig.getName().isEmpty()) {
+            if (StringUtils.isEmpty(functionConfig.getName())) {
                 inferMissingFunctionName(functionConfig);
             }
-            if (functionConfig.getTenant() == null || 
functionConfig.getTenant().isEmpty()) {
+            if (StringUtils.isEmpty(functionConfig.getTenant())) {
                 inferMissingTenant(functionConfig);
             }
-            if (functionConfig.getNamespace() == null || 
functionConfig.getNamespace().isEmpty()) {
+            if (StringUtils.isEmpty(functionConfig.getNamespace())) {
                 inferMissingNamespace(functionConfig);
             }
-            if (functionConfig.getOutput() == null || 
functionConfig.getOutput().isEmpty()) {
+            if (StringUtils.isEmpty(functionConfig.getOutput())) {
                 inferMissingOutput(functionConfig);
             }
         }
@@ -481,7 +509,8 @@ public class CmdFunctions extends CmdBase {
         private void inferMissingOutput(FunctionConfig functionConfig) {
             try {
                 String inputTopic = getUniqueInput(functionConfig);
-                functionConfig.setOutput(inputTopic + "-" + 
functionConfig.getName() + "-output");
+                String outputTopic = String.format("%s-%s-output", inputTopic, 
functionConfig.getName());
+                functionConfig.setOutput(outputTopic);
             } catch (IllegalArgumentException ex) {
                 // It might be that we really don't need an output topic
                 // So we cannot really throw an exception
@@ -512,16 +541,14 @@ public class CmdFunctions extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresent(functionConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
+            checkRequiredFields(functionConfig);
 
             String serviceUrl = admin.getServiceUrl();
             if (brokerServiceUrl != null) {
                 serviceUrl = brokerServiceUrl;
             }
             if (serviceUrl == null) {
-                serviceUrl = "pulsar://localhost:6650";
+                serviceUrl = DEFAULT_SERVICE_URL;
             }
             try (ProcessRuntimeFactory containerFactory = new 
ProcessRuntimeFactory(
                     serviceUrl, null, null, null)) {
@@ -564,9 +591,7 @@ public class CmdFunctions extends CmdBase {
     class CreateFunction extends FunctionDetailsCommand {
         @Override
         void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresent(functionConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
+            checkRequiredFields(functionConfig);
             admin.functions().createFunction(convert(functionConfig), 
userCodeFile);
             print("Created successfully");
         }
@@ -605,9 +630,7 @@ public class CmdFunctions extends CmdBase {
     class UpdateFunction extends FunctionDetailsCommand {
         @Override
         void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresent(functionConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
+            checkRequiredFields(functionConfig);
             admin.functions().updateFunction(convert(functionConfig), 
userCodeFile);
             print("Updated successfully");
         }
@@ -819,11 +842,34 @@ public class CmdFunctions extends CmdBase {
         return  mapper.readValue(file, FunctionConfig.class);
     }
 
-    public static boolean areAllRequiredFieldsPresent(FunctionConfig 
functionConfig) {
-        return functionConfig.getTenant() != null && 
functionConfig.getNamespace() != null
-                && functionConfig.getName() != null && 
functionConfig.getClassName() != null
-                && (functionConfig.getInputs().size() > 0 || 
functionConfig.getCustomSerdeInputs().size() > 0)
-                && functionConfig.getParallelism() > 0;
+    private static void verifyNoTopicClash(Collection<String> inputTopics, 
String outputTopic) throws IllegalArgumentException {
+        if (inputTopics.contains(outputTopic)) {
+            throw new IllegalArgumentException(
+                    String.format("Output topic %s is also being used as an 
input topic (topics must be one or the other)",
+                            outputTopic));
+        }
+    }
+
+    private static void checkRequiredFields(FunctionConfig config) throws 
IllegalArgumentException {
+        if (isNull(config.getTenant())) {
+            throw new IllegalArgumentException("You must specify a tenant for 
the function");
+        }
+
+        if (isNull(config.getNamespace())) {
+            throw new IllegalArgumentException("You must specify a namespace 
for the function");
+        }
+
+        if (isNull(config.getName())) {
+            throw new IllegalArgumentException("You must specify a name for 
the function");
+        }
+
+        if (isNull(config.getClassName())) {
+            throw new IllegalArgumentException("You must specify a class name 
for the function");
+        }
+
+        if (config.getInputs().isEmpty() && 
config.getCustomSerdeInputs().isEmpty()) {
+            throw new IllegalArgumentException("You must specify one or more 
input topics for the function");
+        }
     }
     
     private org.apache.pulsar.functions.proto.Function.FunctionDetails 
convertProto2(FunctionConfig functionConfig)

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to