This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7484664 Pulsar Functions CLI fixes (#1655) 7484664 is described below commit 74846643303385cce847120b86c8342e50950492 Author: Luc Perkins <lucperk...@gmail.com> AuthorDate: Thu Apr 26 16:45:36 2018 -0700 Pulsar Functions CLI fixes (#1655) * add new argument checker function * fix misspelling in variable name * add check for class name to doJavaSubmitChecks * add default service URL as constant * fix input topic checking logic * remove 'got exception' line from Exception printing * add check for Python class name * add basic topic name validation * fix misspelling in getClassname call * use formatted string for inferred output topic name * ensure topics aren't used as both input and output --- .../java/org/apache/pulsar/admin/cli/CmdBase.java | 1 - .../org/apache/pulsar/admin/cli/CmdFunctions.java | 92 ++++++++++++++++------ 2 files changed, 69 insertions(+), 24 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java index bf8ce6c..682413c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java @@ -77,7 +77,6 @@ public abstract class CmdBase { System.err.println("Reason: " + e.getMessage()); return false; } catch (Exception e) { - System.err.println("Got exception: " + e.getMessage()); e.printStackTrace(); return false; } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index c1446ed..1369a69 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -20,6 +20,7 @@ package org.apache.pulsar.admin.cli; import static com.google.common.base.Preconditions.checkNotNull; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.isNull; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import java.io.File; @@ -27,10 +28,12 @@ import java.io.IOException; import java.lang.reflect.Type; import java.net.MalformedURLException; import java.util.Arrays; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.IntStream; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; @@ -40,6 +43,7 @@ import org.apache.bookkeeper.api.kv.result.KeyValue; import org.apache.bookkeeper.clients.StorageClientBuilder; import org.apache.bookkeeper.clients.config.StorageClientSettings; import org.apache.bookkeeper.clients.utils.NetUtils; +import org.apache.commons.lang.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.client.api.PulsarClientException; @@ -76,6 +80,7 @@ import net.jodah.typetools.TypeResolver; @Slf4j @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)") public class CmdFunctions extends CmdBase { + private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650"; private final LocalRunner localRunner; private final CreateFunction creater; @@ -239,14 +244,20 @@ public class CmdFunctions extends CmdBase { } if (null != inputs) { - Arrays.asList(inputs.split(",")).forEach(functionConfig.getInputs()::add); + List<String> inputTopics = Arrays.asList(inputs.split(",")); + inputTopics.forEach(this::validateTopicName); + functionConfig.setInputs(inputTopics); } if (null != customSerdeInputString) { Type type = new TypeToken<Map<String, String>>(){}.getType(); Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type); + customSerdeInputMap.forEach((topic, serde) -> { + validateTopicName(topic); + }); functionConfig.setCustomSerdeInputs(customSerdeInputMap); } if (null != output) { + validateTopicName(output); functionConfig.setOutput(output); } if (null != logTopic) { @@ -281,10 +292,13 @@ public class CmdFunctions extends CmdBase { throw new RuntimeException("Either a Java jar or a Python file needs to be specified for the function"); } - if (functionConfig.getInputs().size() == 0 && functionConfig.getCustomSerdeInputs().size() == 0) { + if (functionConfig.getInputs().isEmpty() && functionConfig.getCustomSerdeInputs().isEmpty()) { throw new RuntimeException("No input topic(s) specified for the function"); } + // Ensure that topics aren't being used as both input and output + verifyNoTopicClash(functionConfig.getInputs(), functionConfig.getOutput());; + if (parallelism == null) { if (functionConfig.getParallelism() == 0) { functionConfig.setParallelism(1); @@ -301,7 +315,7 @@ public class CmdFunctions extends CmdBase { && functionConfig.getSubscriptionType() != FunctionConfig.SubscriptionType.FAILOVER && functionConfig.getProcessingGuarantees() != null && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { - throw new IllegalArgumentException("Effectively Once can only be acheived with Failover subscription"); + throw new IllegalArgumentException("Effectively-once processing semantics can only be achieved using a Failover subscription type"); } functionConfig.setAutoAck(true); @@ -309,6 +323,10 @@ public class CmdFunctions extends CmdBase { } private void doJavaSubmitChecks(FunctionConfig functionConfig) { + if (isNull(className)) { + throw new IllegalArgumentException("You supplied a jar file but no main class"); + } + File file = new File(jarFile); // check if the function class exists in Jar and it implements Function class if (!Reflections.classExistsInJar(file, functionConfig.getClassName())) { @@ -431,22 +449,32 @@ public class CmdFunctions extends CmdBase { } private void doPythonSubmitChecks(FunctionConfig functionConfig) { + if (functionConfig.getClassName() == null) { + throw new IllegalArgumentException("You specified a Python file but no main class name"); + } + if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python"); } } + private void validateTopicName(String topic) { + if (!TopicName.isValid(topic)) { + throw new IllegalArgumentException(String.format("The topic name %s is invalid", topic)); + } + } + private void inferMissingArguments(FunctionConfig functionConfig) { - if (functionConfig.getName() == null || functionConfig.getName().isEmpty()) { + if (StringUtils.isEmpty(functionConfig.getName())) { inferMissingFunctionName(functionConfig); } - if (functionConfig.getTenant() == null || functionConfig.getTenant().isEmpty()) { + if (StringUtils.isEmpty(functionConfig.getTenant())) { inferMissingTenant(functionConfig); } - if (functionConfig.getNamespace() == null || functionConfig.getNamespace().isEmpty()) { + if (StringUtils.isEmpty(functionConfig.getNamespace())) { inferMissingNamespace(functionConfig); } - if (functionConfig.getOutput() == null || functionConfig.getOutput().isEmpty()) { + if (StringUtils.isEmpty(functionConfig.getOutput())) { inferMissingOutput(functionConfig); } } @@ -481,7 +509,8 @@ public class CmdFunctions extends CmdBase { private void inferMissingOutput(FunctionConfig functionConfig) { try { String inputTopic = getUniqueInput(functionConfig); - functionConfig.setOutput(inputTopic + "-" + functionConfig.getName() + "-output"); + String outputTopic = String.format("%s-%s-output", inputTopic, functionConfig.getName()); + functionConfig.setOutput(outputTopic); } catch (IllegalArgumentException ex) { // It might be that we really don't need an output topic // So we cannot really throw an exception @@ -512,16 +541,14 @@ public class CmdFunctions extends CmdBase { @Override void runCmd() throws Exception { - if (!areAllRequiredFieldsPresent(functionConfig)) { - throw new RuntimeException("Missing arguments"); - } + checkRequiredFields(functionConfig); String serviceUrl = admin.getServiceUrl(); if (brokerServiceUrl != null) { serviceUrl = brokerServiceUrl; } if (serviceUrl == null) { - serviceUrl = "pulsar://localhost:6650"; + serviceUrl = DEFAULT_SERVICE_URL; } try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory( serviceUrl, null, null, null)) { @@ -564,9 +591,7 @@ public class CmdFunctions extends CmdBase { class CreateFunction extends FunctionDetailsCommand { @Override void runCmd() throws Exception { - if (!areAllRequiredFieldsPresent(functionConfig)) { - throw new RuntimeException("Missing arguments"); - } + checkRequiredFields(functionConfig); admin.functions().createFunction(convert(functionConfig), userCodeFile); print("Created successfully"); } @@ -605,9 +630,7 @@ public class CmdFunctions extends CmdBase { class UpdateFunction extends FunctionDetailsCommand { @Override void runCmd() throws Exception { - if (!areAllRequiredFieldsPresent(functionConfig)) { - throw new RuntimeException("Missing arguments"); - } + checkRequiredFields(functionConfig); admin.functions().updateFunction(convert(functionConfig), userCodeFile); print("Updated successfully"); } @@ -819,11 +842,34 @@ public class CmdFunctions extends CmdBase { return mapper.readValue(file, FunctionConfig.class); } - public static boolean areAllRequiredFieldsPresent(FunctionConfig functionConfig) { - return functionConfig.getTenant() != null && functionConfig.getNamespace() != null - && functionConfig.getName() != null && functionConfig.getClassName() != null - && (functionConfig.getInputs().size() > 0 || functionConfig.getCustomSerdeInputs().size() > 0) - && functionConfig.getParallelism() > 0; + private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException { + if (inputTopics.contains(outputTopic)) { + throw new IllegalArgumentException( + String.format("Output topic %s is also being used as an input topic (topics must be one or the other)", + outputTopic)); + } + } + + private static void checkRequiredFields(FunctionConfig config) throws IllegalArgumentException { + if (isNull(config.getTenant())) { + throw new IllegalArgumentException("You must specify a tenant for the function"); + } + + if (isNull(config.getNamespace())) { + throw new IllegalArgumentException("You must specify a namespace for the function"); + } + + if (isNull(config.getName())) { + throw new IllegalArgumentException("You must specify a name for the function"); + } + + if (isNull(config.getClassName())) { + throw new IllegalArgumentException("You must specify a class name for the function"); + } + + if (config.getInputs().isEmpty() && config.getCustomSerdeInputs().isEmpty()) { + throw new IllegalArgumentException("You must specify one or more input topics for the function"); + } } private org.apache.pulsar.functions.proto.Function.FunctionDetails convertProto2(FunctionConfig functionConfig) -- To stop receiving notification emails like this one, please contact mme...@apache.org.