This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7c0fd11 allowing users to specify function jar in yml file (#1899) 7c0fd11 is described below commit 7c0fd11542a5a87d62628b12918af03065f9bef0 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon Jun 4 12:55:17 2018 -0700 allowing users to specify function jar in yml file (#1899) * allowing users to specify function jar in yml file * fixing unit tests --- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 4 +- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 43 +++++++++++++++++----- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 17 +++++++-- .../org/apache/pulsar/admin/cli/CmdSources.java | 13 ++++++- .../pulsar/functions/utils/FunctionConfig.java | 5 +++ .../apache/pulsar/functions/utils/SinkConfig.java | 3 ++ .../pulsar/functions/utils/SourceConfig.java | 3 ++ .../org/apache/pulsar/functions/utils/Utils.java | 4 ++ .../validation/ConfigValidationAnnotations.java | 9 +++++ .../functions/utils/validation/ValidatorImpls.java | 17 ++++++++- 10 files changed, 101 insertions(+), 17 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 0826956..20499e4 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -40,6 +40,7 @@ import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.functions.utils.Utils; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -76,7 +77,7 @@ import static org.testng.Assert.assertEquals; * Unit test of {@link CmdFunctions}. */ @Slf4j -@PrepareForTest({ CmdFunctions.class, Reflections.class, StorageClientBuilder.class }) +@PrepareForTest({ CmdFunctions.class, Reflections.class, StorageClientBuilder.class, Utils.class}) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" }) public class CmdFunctionsTest { @@ -127,6 +128,7 @@ public class CmdFunctionsTest { when(Reflections.classImplementsIface(anyString(), any())).thenReturn(true); when(Reflections.createInstance(eq(DummyFunction.class.getName()), any(File.class))).thenReturn(new DummyFunction()); when(Reflections.createInstance(eq(DefaultSerDe.class.getName()), any(File.class))).thenReturn(new DefaultSerDe(String.class)); + PowerMockito.stub(PowerMockito.method(Utils.class, "fileExists")).toReturn(true); } // @Test diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 30916cd..e726cce 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -79,6 +79,7 @@ import static java.util.Objects.isNull; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; +import static org.apache.pulsar.functions.utils.Utils.fileExists; @Slf4j @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)") @@ -361,15 +362,18 @@ public class CmdFunctions extends CmdBase { functionConfig.setAutoAck(true); } - if (null != jarFile) { - functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); - userCodeFile = jarFile; - } else if (null != pyFile) { - functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON); - userCodeFile = pyFile; - } else { - throw new ParameterException("Either a Java jar or a Python file needs to be specified for the function"); + functionConfig.setJar(jarFile); + } + + if (null != pyFile) { + functionConfig.setPy(pyFile); + } + + if (functionConfig.getJar() != null) { + userCodeFile = functionConfig.getJar(); + } else if (functionConfig.getPy() != null) { + userCodeFile = functionConfig.getPy(); } // infer default vaues @@ -378,8 +382,22 @@ public class CmdFunctions extends CmdBase { protected void validateFunctionConfigs(FunctionConfig functionConfig) { + if (functionConfig.getJar() != null && functionConfig.getPy() != null) { + throw new ParameterException("Either a Java jar or a Python file needs to" + + " be specified for the function. Cannot specify both."); + } + + if (functionConfig.getJar() == null && functionConfig.getPy() == null) { + throw new ParameterException("Either a Java jar or a Python file needs to" + + " be specified for the function. Please specify one."); + } + + if (!fileExists(userCodeFile)) { + throw new ParameterException("File " + userCodeFile + " does not exist"); + } + if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { - File file = new File(jarFile); + File file = new File(functionConfig.getJar()); ClassLoader userJarLoader; try { userJarLoader = Reflections.loadJar(file); @@ -394,6 +412,7 @@ public class CmdFunctions extends CmdBase { // Need to load jar and set context class loader before calling ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name()); } catch (Exception e) { + log.info("ex: {}", e, e); throw new ParameterException(e.getMessage()); } } @@ -416,6 +435,12 @@ public class CmdFunctions extends CmdBase { functionConfig.setParallelism(1); } + if (functionConfig.getJar() != null) { + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + } else if (functionConfig.getPy() != null) { + functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON); + } + WindowConfig windowConfig = functionConfig.getWindowConfig(); if (windowConfig != null) { WindowUtils.inferDefaultConfigs(windowConfig); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index fdbe291..607a4d2 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -51,6 +51,7 @@ import java.util.Map; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; +import static org.apache.pulsar.functions.utils.Utils.fileExists; import static org.apache.pulsar.functions.utils.Utils.getSinkType; import static org.apache.pulsar.functions.utils.Utils.loadConfig; @@ -175,7 +176,7 @@ public class CmdSinks extends CmdBase { if (null != tenant) { sinkConfig.setTenant(tenant); } - + if (null != namespace) { sinkConfig.setNamespace(namespace); } @@ -208,8 +209,8 @@ public class CmdSinks extends CmdBase { sinkConfig.setParallelism(parallelism); } - if (null == jarFile) { - throw new IllegalArgumentException("Connector JAR not specfied"); + if (null != jarFile) { + sinkConfig.setJar(jarFile); } sinkConfig.setResources(new org.apache.pulsar.functions.utils.Resources(cpu, ram, disk)); @@ -233,7 +234,15 @@ public class CmdSinks extends CmdBase { } protected void validateSinkConfigs(SinkConfig sinkConfig) { - File file = new File(jarFile); + if (null == sinkConfig.getJar()) { + throw new ParameterException("Sink jar not specfied"); + } + + if (!fileExists(sinkConfig.getJar())) { + throw new ParameterException("Jar file " + sinkConfig.getJar() + " does not exist"); + } + + File file = new File(sinkConfig.getJar()); ClassLoader userJarLoader; try { userJarLoader = Reflections.loadJar(file); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 97d5fd9..ffc4891 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -47,6 +47,7 @@ import java.util.Map; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; +import static org.apache.pulsar.functions.utils.Utils.fileExists; import static org.apache.pulsar.functions.utils.Utils.getSourceType; import static org.apache.pulsar.functions.utils.Utils.loadConfig; @@ -191,8 +192,8 @@ public class CmdSources extends CmdBase { sourceConfig.setParallelism(parallelism); } - if (null == jarFile) { - throw new ParameterException("Source JAR not specfied"); + if (jarFile != null) { + sourceConfig.setJar(jarFile); } sourceConfig.setResources(new org.apache.pulsar.functions.utils.Resources(cpu, ram, disk)); @@ -216,6 +217,14 @@ public class CmdSources extends CmdBase { } protected void validateSourceConfigs(SourceConfig sourceConfig) { + if (null == sourceConfig.getJar()) { + throw new ParameterException("Source jar not specfied"); + } + + if (!fileExists(sourceConfig.getJar())) { + throw new ParameterException("Jar file " + sourceConfig.getJar() + " does not exist"); + } + File file = new File(jarFile); ClassLoader userJarLoader; try { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index be40e01..9671d50 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -27,6 +27,7 @@ import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.utils.validation.ConfigValidation; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom; @@ -95,4 +96,8 @@ public class FunctionConfig { private WindowConfig windowConfig; @isPositiveNumber private Long timeoutMs; + @isFileExists + private String jar; + @isFileExists + private String py; } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java index 9fa0307..f4ee77c 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java @@ -24,6 +24,7 @@ import lombok.Getter; import lombok.Setter; import lombok.ToString; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber; @@ -60,4 +61,6 @@ public class SinkConfig { private FunctionConfig.ProcessingGuarantees processingGuarantees; @isValidResources private Resources resources; + @isFileExists + private String jar; } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java index cdede64..295f339 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java @@ -25,6 +25,7 @@ import lombok.Setter; import lombok.ToString; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources; @@ -62,4 +63,6 @@ public class SourceConfig { private FunctionConfig.ProcessingGuarantees processingGuarantees; @isValidResources private Resources resources; + @isFileExists + private String jar; } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 3f7c2e4..c534ed3 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -205,4 +205,8 @@ public class Utils { return typeArg; } + + public static boolean fileExists(String file) { + return new File(file).exists(); + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java index 934f6f5..e6e0583 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java @@ -189,6 +189,15 @@ public class ConfigValidationAnnotations { } /** + * check if file exists + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isFileExists { + Class<?> validatorClass() default ValidatorImpls.FileValidator.class; + } + + /** * checks function config as a whole to make sure all fields are valid */ @Retention(RetentionPolicy.RUNTIME) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java index 3e0a168..2ee6044 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java @@ -33,12 +33,14 @@ import org.apache.pulsar.functions.utils.SourceConfig; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.WindowConfig; +import java.io.File; import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Map; +import static org.apache.pulsar.functions.utils.Utils.fileExists; import static org.apache.pulsar.functions.utils.Utils.getSinkType; import static org.apache.pulsar.functions.utils.Utils.getSourceType; @@ -747,9 +749,22 @@ public class ValidatorImpls { } } + public static class FileValidator extends Validator { + @Override + public void validateField(String name, Object o) { + if (o == null) { + return; + } + new StringValidator().validateField(name, o); + if (!fileExists((String) o)) { + throw new IllegalArgumentException + (String.format("File %s specified in field '%s' does not exist", o, name)); + } + } + } - /** + /** * Validates basic types. */ public static class SimpleTypeValidator extends Validator { -- To stop receiving notification emails like this one, please contact si...@apache.org.