This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ecec933 improving config validation (#1859) ecec933 is described below commit ecec9337094820c65fe5ecf128f8070a3538bddd Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Wed May 30 12:51:11 2018 -0700 improving config validation (#1859) * improving config validation * removing unnecessary file * removing unnecessary log * fix bug * fix potential NPE --- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 7 +- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 303 ++-------- .../windowing/WindowFunctionExecutor.java | 5 +- .../pulsar/functions/windowing/WindowUtils.java | 53 +- .../windowing/WindowFunctionExecutorTest.java | 4 +- pulsar-functions/utils/pom.xml | 5 + .../pulsar/functions/utils/FunctionConfig.java | 34 +- .../org/apache/pulsar/functions/utils/Utils.java | 73 +++ .../utils/validation/ConfigValidation.java | 119 ++++ .../validation/ConfigValidationAnnotations.java | 183 ++++++ .../utils/validation/ConfigValidationUtils.java | 177 ++++++ .../functions/utils/validation/Validator.java | 31 + .../functions/utils/validation/ValidatorImpls.java | 655 +++++++++++++++++++++ 13 files changed, 1352 insertions(+), 297 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index de4ab40..1cc239d 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -86,7 +86,12 @@ public class CmdFunctionsTest { private Functions functions; private CmdFunctions cmd; - public class DummyFunction implements Function<String, String> { + public static class DummyFunction implements Function<String, String> { + + public DummyFunction() { + + } + @Override public String process(String input, Context context) throws Exception { return null; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index d565568..94e17ad 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -18,14 +18,8 @@ */ package org.apache.pulsar.admin.cli; -import static com.google.common.base.Preconditions.checkNotNull; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Objects.isNull; -import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; -import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; -import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; - import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; import com.beust.jcommander.Parameters; import com.beust.jcommander.converters.StringConverter; import com.fasterxml.jackson.databind.ObjectMapper; @@ -35,31 +29,12 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonParser; import com.google.gson.reflect.TypeToken; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.net.MalformedURLException; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; - +import net.jodah.typetools.TypeResolver; import org.apache.bookkeeper.api.StorageClient; import org.apache.bookkeeper.api.kv.Table; import org.apache.bookkeeper.api.kv.result.KeyValue; @@ -70,10 +45,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Function; -import org.apache.pulsar.functions.api.SerDe; -import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; @@ -87,10 +59,33 @@ import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.WindowConfig; +import org.apache.pulsar.functions.utils.validation.ConfigValidation; import org.apache.pulsar.functions.windowing.WindowFunctionExecutor; import org.apache.pulsar.functions.windowing.WindowUtils; -import net.jodah.typetools.TypeResolver; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.net.MalformedURLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.isNull; +import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; +import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; @Slf4j @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)") @@ -228,7 +223,7 @@ public class CmdFunctions extends CmdBase { @Parameter(names = "--userConfig", description = "User-defined config key/values") protected String userConfigString; @Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)") - protected String parallelism; + protected Integer parallelism; @Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated per function instance(applicable only to docker runtime)") protected Double cpu; @Parameter(names = "--ram", description = "The ram in bytes that need to be allocated per function instance(applicable only to process/docker runtime)") @@ -275,19 +270,14 @@ public class CmdFunctions extends CmdBase { if (null != inputs) { List<String> inputTopics = Arrays.asList(inputs.split(",")); - inputTopics.forEach(this::validateTopicName); functionConfig.setInputs(inputTopics); } if (null != customSerdeInputString) { Type type = new TypeToken<Map<String, String>>(){}.getType(); Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type); - customSerdeInputMap.forEach((topic, serde) -> { - validateTopicName(topic); - }); functionConfig.setCustomSerdeInputs(customSerdeInputMap); } if (null != output) { - validateTopicName(output); functionConfig.setOutput(output); } if (null != logTopic) { @@ -320,37 +310,12 @@ public class CmdFunctions extends CmdBase { functionConfig.setUserConfig(new HashMap<>()); } - if (functionConfig.getInputs().isEmpty() && functionConfig.getCustomSerdeInputs().isEmpty()) { - throw new RuntimeException("No input topic(s) specified for the function"); - } - - // Ensure that topics aren't being used as both input and output - verifyNoTopicClash(functionConfig.getInputs(), functionConfig.getOutput()); - - if (parallelism == null) { - if (functionConfig.getParallelism() == 0) { - functionConfig.setParallelism(1); - } - } else { - int num = Integer.parseInt(parallelism); - if (num <= 0) { - throw new IllegalArgumentException("The parallelism factor (the number of instances) for the function must be positive"); - } - functionConfig.setParallelism(num); + if (parallelism != null) { + functionConfig.setParallelism(parallelism); } - com.google.common.base.Preconditions.checkArgument(cpu == null || cpu > 0, "The cpu allocation for the function must be positive"); - com.google.common.base.Preconditions.checkArgument(ram == null || ram > 0, "The ram allocation for the function must be positive"); - com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0, "The disk allocation for the function must be positive"); functionConfig.setResources(new org.apache.pulsar.functions.utils.Resources(cpu, ram, disk)); - if (functionConfig.getSubscriptionType() != null - && functionConfig.getSubscriptionType() != FunctionConfig.SubscriptionType.FAILOVER - && functionConfig.getProcessingGuarantees() != null - && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { - throw new IllegalArgumentException("Effectively-once processing semantics can only be achieved using a Failover subscription type"); - } - // window configs WindowConfig windowConfig = functionConfig.getWindowConfig(); if (null != windowLengthCount) { @@ -377,15 +342,7 @@ public class CmdFunctions extends CmdBase { } windowConfig.setSlidingIntervalDurationMs(slidingIntervalDurationMs); } - if (windowConfig != null) { - WindowUtils.validateAndSetDefaultsWindowConfig(windowConfig); - // set auto ack to false since windowing framework is responsible - // for acking and not the function framework - if (autoAck != null && autoAck == true) { - throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality"); - } - functionConfig.setAutoAck(false); - } + functionConfig.setWindowConfig(windowConfig); if (null != autoAck) { @@ -394,188 +351,43 @@ public class CmdFunctions extends CmdBase { functionConfig.setAutoAck(true); } - inferMissingArguments(functionConfig); if (null != jarFile) { - doJavaSubmitChecks(functionConfig); functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); userCodeFile = jarFile; } else if (null != pyFile) { - doPythonSubmitChecks(functionConfig); functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON); userCodeFile = pyFile; } else { throw new RuntimeException("Either a Java jar or a Python file needs to be specified for the function"); } - } - private Class<?>[] getFunctionTypes(File file, FunctionConfig functionConfig) { - assertClassExistsInJar(file); - - Object userClass = Reflections.createInstance(functionConfig.getClassName(), file); - Class<?>[] typeArgs; - // if window function - if (functionConfig.getWindowConfig() != null) { - java.util.function.Function function = (java.util.function.Function) userClass; - if (function == null) { - throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated from jar %s", - functionConfig.getClassName(), jarFile)); - } - typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass()); - if (!typeArgs[0].equals(Collection.class)) { - throw new IllegalArgumentException("Window function must take a collection as input"); - } - Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass()); - Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0]; - Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0]; - typeArgs[0] = (Class<?>) actualInputType; - } else { - if (userClass instanceof Function) { - Function pulsarFunction = (Function) userClass; - if (pulsarFunction == null) { - throw new IllegalArgumentException(String.format("The Pulsar function class %s could not be instantiated from jar %s", - functionConfig.getClassName(), jarFile)); - } - typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass()); - } else { - java.util.function.Function function = (java.util.function.Function) userClass; - if (function == null) { - throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated from jar %s", - functionConfig.getClassName(), jarFile)); - } - typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass()); - } - } + // infer default vaues + inferMissingArguments(functionConfig); - return typeArgs; + // check if function configs are valid + validateFunctionConfigs(functionConfig); } - private void assertClassExistsInJar(File file) { - if (!Reflections.classExistsInJar(file, functionConfig.getClassName())) { - throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s", - functionConfig.getClassName(), jarFile)); - } else if (!Reflections.classInJarImplementsIface(file, functionConfig.getClassName(), Function.class) - && !Reflections.classInJarImplementsIface(file, functionConfig.getClassName(), java.util.function.Function.class)) { - throw new IllegalArgumentException(String.format("The Pulsar function class %s in jar %s implements neither org.apache.pulsar.functions.api.Function nor java.util.function.Function", - functionConfig.getClassName(), jarFile)); - } - } + private void validateFunctionConfigs(FunctionConfig functionConfig) { - private void doJavaSubmitChecks(FunctionConfig functionConfig) { - if (isNull(functionConfig.getClassName())) { - throw new IllegalArgumentException("You supplied a jar file but no main class"); - } - - File file = new File(jarFile); - ClassLoader userJarLoader; - try { - userJarLoader = Reflections.loadJar(file); - } catch (MalformedURLException e) { - throw new RuntimeException("Failed to load user jar " + file, e); - } - Class<?>[] typeArgs = getFunctionTypes(file, functionConfig); - // Check if the Input serialization/deserialization class exists in jar or already loaded and that it - // implements SerDe class - functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> { - if (!Reflections.classExists(inputSerializer) - && !Reflections.classExistsInJar(new File(jarFile), inputSerializer)) { - throw new IllegalArgumentException( - String.format("The input serialization/deserialization class %s does not exist", - inputSerializer)); - } else if (Reflections.classExists(inputSerializer)) { - if (!Reflections.classImplementsIface(inputSerializer, SerDe.class)) { - throw new IllegalArgumentException(String.format("The input serialization/deserialization class %s does not not implement %s", - inputSerializer, SerDe.class.getCanonicalName())); - } - } else if (Reflections.classExistsInJar(new File(jarFile), inputSerializer)) { - if (!Reflections.classInJarImplementsIface(new File(jarFile), inputSerializer, SerDe.class)) { - throw new IllegalArgumentException(String.format("The input serialization/deserialization class %s does not not implement %s", - inputSerializer, SerDe.class.getCanonicalName())); - } - } - if (inputSerializer.equals(DefaultSerDe.class.getName())) { - if (!DefaultSerDe.IsSupportedType(typeArgs[0])) { - throw new RuntimeException("The default Serializer does not support type " + typeArgs[0]); - } - } else { - SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, file); - if (serDe == null) { - throw new IllegalArgumentException(String.format("The SerDe class %s does not exist in jar %s", - inputSerializer, jarFile)); - } - Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass()); - - // type inheritance information seems to be lost in generic type - // load the actual type class for verification - Class<?> fnInputClass; - Class<?> serdeInputClass; - try { - fnInputClass = Class.forName(typeArgs[0].getName(), true, userJarLoader); - serdeInputClass = Class.forName(serDeTypes[0].getName(), true, userJarLoader); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Failed to load type class", e); - } - - if (!fnInputClass.isAssignableFrom(serdeInputClass)) { - throw new RuntimeException("Serializer type mismatch " + typeArgs[0] + " vs " + serDeTypes[0]); - } - } - }); - functionConfig.getInputs().forEach((topicName) -> { - if (!DefaultSerDe.IsSupportedType(typeArgs[0])) { - throw new RuntimeException("Default Serializer does not support type " + typeArgs[0]); - } - }); - if (!Void.class.equals(typeArgs[1])) { - if (functionConfig.getOutputSerdeClassName() == null - || functionConfig.getOutputSerdeClassName().isEmpty() - || functionConfig.getOutputSerdeClassName().equals(DefaultSerDe.class.getName())) { - if (!DefaultSerDe.IsSupportedType(typeArgs[1])) { - throw new RuntimeException("Default Serializer does not support type " + typeArgs[1]); - } - } else { - SerDe serDe = (SerDe) Reflections.createInstance(functionConfig.getOutputSerdeClassName(), file); - if (serDe == null) { - throw new IllegalArgumentException(String.format("SerDe class %s does not exist in jar %s", - functionConfig.getOutputSerdeClassName(), jarFile)); - } - Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass()); - - // type inheritance information seems to be lost in generic type - // load the actual type class for verification - Class<?> fnOutputClass; - Class<?> serdeOutputClass; - try { - fnOutputClass = Class.forName(typeArgs[1].getName(), true, userJarLoader); - serdeOutputClass = Class.forName(serDeTypes[0].getName(), true, userJarLoader); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Failed to load type class", e); - } - - if (!serdeOutputClass.isAssignableFrom(fnOutputClass)) { - throw new RuntimeException("Serializer type mismatch " + typeArgs[1] + " vs " + serDeTypes[0]); - } + if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { + File file = new File(jarFile); + ClassLoader userJarLoader; + try { + userJarLoader = Reflections.loadJar(file); + } catch (MalformedURLException e) { + throw new RuntimeException("Failed to load user jar " + file, e); } - } - } - - private void doPythonSubmitChecks(FunctionConfig functionConfig) { - if (functionConfig.getClassName() == null) { - throw new IllegalArgumentException("You specified a Python file but no main class name"); - } - - if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { - throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python"); + // make sure the function class loader is accessible thread-locally + Thread.currentThread().setContextClassLoader(userJarLoader); } - if (functionConfig.getWindowConfig() != null) { - throw new IllegalArgumentException("There is currently no support windowing in python"); - } - } - - private void validateTopicName(String topic) { - if (!TopicName.isValid(topic)) { - throw new IllegalArgumentException(String.format("The topic name %s is invalid", topic)); + try { + // Need to load jar and set context class loader before calling + ConfigValidation.validateConfig(functionConfig); + } catch (Exception e) { + throw new ParameterException(e.getMessage()); } } @@ -592,6 +404,21 @@ public class CmdFunctions extends CmdBase { if (StringUtils.isEmpty(functionConfig.getOutput())) { inferMissingOutput(functionConfig); } + + if (functionConfig.getParallelism() == 0) { + functionConfig.setParallelism(1); + } + + WindowConfig windowConfig = functionConfig.getWindowConfig(); + if (windowConfig != null) { + WindowUtils.inferDefaultConfigs(windowConfig); + // set auto ack to false since windowing framework is responsible + // for acking and not the function framework + if (autoAck != null && autoAck == true) { + throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality"); + } + functionConfig.setAutoAck(false); + } } private void inferMissingFunctionName(FunctionConfig functionConfig) { @@ -649,7 +476,7 @@ public class CmdFunctions extends CmdBase { } catch (MalformedURLException e) { throw new RuntimeException("Failed to load user jar " + file, e); } - typeArgs = getFunctionTypes(file, functionConfig); + typeArgs = Utils.getFunctionTypes(functionConfig); } FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java index a82a195..8f16eac 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java @@ -25,6 +25,7 @@ import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.WindowConfig; +import org.apache.pulsar.functions.utils.validation.ValidatorImpls; import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy; import org.apache.pulsar.functions.windowing.evictors.TimeEvictionPolicy; import org.apache.pulsar.functions.windowing.evictors.WatermarkCountEvictionPolicy; @@ -93,7 +94,9 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { (new Gson().toJson(context.getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY).get())), WindowConfig.class); - WindowUtils.validateAndSetDefaultsWindowConfig(windowConfig); + + WindowUtils.inferDefaultConfigs(windowConfig); + ValidatorImpls.WindowConfigValidator.validateWindowConfig(windowConfig); return windowConfig; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java index 6de61c2..73dda87 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java @@ -25,44 +25,7 @@ public class WindowUtils { return String.format("%s/%s/%s", tenant, namespace, name); } - public static void validateAndSetDefaultsWindowConfig(WindowConfig windowConfig) { - if (windowConfig.getWindowLengthDurationMs() == null && windowConfig.getWindowLengthCount() == null) { - throw new IllegalArgumentException("Window length is not specified"); - } - - if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getWindowLengthCount() != null) { - throw new IllegalArgumentException( - "Window length for time and count are set! Please set one or the other."); - } - - if (windowConfig.getWindowLengthCount() != null) { - if (windowConfig.getWindowLengthCount() <= 0) { - throw new IllegalArgumentException( - "Window length must be positive [" + windowConfig.getWindowLengthCount() + "]"); - } - } - - if (windowConfig.getWindowLengthDurationMs() != null) { - if (windowConfig.getWindowLengthDurationMs() <= 0) { - throw new IllegalArgumentException( - "Window length must be positive [" + windowConfig.getWindowLengthDurationMs() + "]"); - } - } - - if (windowConfig.getSlidingIntervalCount() != null) { - if (windowConfig.getSlidingIntervalCount() <= 0) { - throw new IllegalArgumentException( - "Sliding interval must be positive [" + windowConfig.getSlidingIntervalCount() + "]"); - } - } - - if (windowConfig.getSlidingIntervalDurationMs() != null) { - if (windowConfig.getSlidingIntervalDurationMs() <= 0) { - throw new IllegalArgumentException( - "Sliding interval must be positive [" + windowConfig.getSlidingIntervalDurationMs() + "]"); - } - } - + public static void inferDefaultConfigs(WindowConfig windowConfig) { if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getSlidingIntervalDurationMs() == null) { windowConfig.setSlidingIntervalDurationMs(windowConfig.getWindowLengthDurationMs()); } @@ -72,20 +35,10 @@ public class WindowUtils { } if (windowConfig.getTimestampExtractorClassName() != null) { - if (windowConfig.getMaxLagMs() != null) { - if (windowConfig.getMaxLagMs() <= 0) { - throw new IllegalArgumentException( - "Lag duration must be positive [" + windowConfig.getMaxLagMs() + "]"); - } - } else { + if (windowConfig.getMaxLagMs() == null) { windowConfig.setMaxLagMs(WindowFunctionExecutor.DEFAULT_MAX_LAG_MS); } - if (windowConfig.getWatermarkEmitIntervalMs() != null) { - if (windowConfig.getWatermarkEmitIntervalMs() <= 0) { - throw new IllegalArgumentException( - "Watermark interval must be positive [" + windowConfig.getWatermarkEmitIntervalMs() + "]"); - } - } else { + if (windowConfig.getWatermarkEmitIntervalMs() == null) { windowConfig.setWatermarkEmitIntervalMs(WindowFunctionExecutor.DEFAULT_WATERMARK_EVENT_INTERVAL_MS); } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java index 33565bc..c5d5d4a 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java @@ -537,8 +537,8 @@ public class WindowFunctionExecutorTest { if (arg0 == null) { Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getMaxLagMs(), new Long(testWindowedPulsarFunction.DEFAULT_MAX_LAG_MS)); - } else if((Long) arg0 <= 0) { - fail(String.format("Window lag cannot be zero or less -- lagTime: %s", arg0)); + } else if((Long) arg0 < 0) { + fail(String.format("Window lag cannot be less than zero -- lagTime: %s", arg0)); } else { Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getMaxLagMs().longValue(), maxLagMs.longValue()); diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml index b278ed7..01710b1 100644 --- a/pulsar-functions/utils/pom.xml +++ b/pulsar-functions/utils/pom.xml @@ -82,6 +82,11 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>net.jodah</groupId> + <artifactId>typetools</artifactId> + </dependency> + </dependencies> </project> diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index ffdda63..2940e32 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -23,10 +23,21 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; +import org.apache.pulsar.functions.api.Function; +import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidFunctionConfig; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName; +import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidWindowConfig; +import org.apache.pulsar.functions.utils.validation.ValidatorImpls; import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; import java.util.Map; @Getter @@ -34,6 +45,7 @@ import java.util.Map; @Data @EqualsAndHashCode @ToString +@isValidFunctionConfig public class FunctionConfig { public enum ProcessingGuarantees { @@ -61,25 +73,37 @@ public class FunctionConfig { PYTHON } + + @NotNull private String tenant; + @NotNull private String namespace; + @NotNull private String name; + @NotNull + @isImplementationOfClasses(implementsClasses = {Function.class, java.util.function.Function.class}) private String className; - + @isListEntryCustom(entryValidatorClasses = {ValidatorImpls.TopicNameValidator.class}) private Collection<String> inputs; + @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, + valueValidatorClasses = { ValidatorImpls.SerdeValidator.class }) private Map<String, String> customSerdeInputs; - + @isValidTopicName private String output; + @isImplementationOfClass(implementsClass = SerDe.class) private String outputSerdeClassName; - + @isValidTopicName private String logTopic; private ProcessingGuarantees processingGuarantees; private Map<String, Object> userConfig; private SubscriptionType subscriptionType; private Runtime runtime; private boolean autoAck; + @isPositiveNumber private int parallelism; + @isValidResources private Resources resources; private String fqfn; + @isValidWindowConfig private WindowConfig windowConfig; } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 68ac86b..2963e57 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -21,17 +21,28 @@ package org.apache.pulsar.functions.utils; import com.google.protobuf.AbstractMessage.Builder; import com.google.protobuf.MessageOrBuilder; import com.google.protobuf.util.JsonFormat; + import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.net.ServerSocket; +import java.util.Collection; import lombok.AccessLevel; import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.functions.api.Function; + +import net.jodah.typetools.TypeResolver; /** * Utils used for runtime. */ +@Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) public class Utils { @@ -76,4 +87,66 @@ public class Utils { throw new RuntimeException("No free port found", ex); } } + + public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig) { + + Object userClass = createInstance(functionConfig.getClassName(), Thread.currentThread().getContextClassLoader()); + + Class<?>[] typeArgs; + // if window function + if (functionConfig.getWindowConfig() != null) { + java.util.function.Function function = (java.util.function.Function) userClass; + if (function == null) { + throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated", + functionConfig.getClassName())); + } + typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass()); + if (!typeArgs[0].equals(Collection.class)) { + throw new IllegalArgumentException("Window function must take a collection as input"); + } + Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass()); + Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0]; + Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0]; + typeArgs[0] = (Class<?>) actualInputType; + } else { + if (userClass instanceof Function) { + Function pulsarFunction = (Function) userClass; + typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass()); + } else { + java.util.function.Function function = (java.util.function.Function) userClass; + typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass()); + } + } + + return typeArgs; + } + + public static Object createInstance(String userClassName, ClassLoader classLoader) { + Class<?> theCls; + try { + theCls = Class.forName(userClassName); + } catch (ClassNotFoundException cnfe) { + try { + theCls = Class.forName(userClassName, true, classLoader); + } catch (ClassNotFoundException e) { + throw new RuntimeException("User class must be in class path", cnfe); + } + } + Object result; + try { + Constructor<?> meth = theCls.getDeclaredConstructor(); + meth.setAccessible(true); + result = meth.newInstance(); + } catch (InstantiationException ie) { + throw new RuntimeException("User class must be concrete", ie); + } catch (NoSuchMethodException e) { + throw new RuntimeException("User class doesn't have such method", e); + } catch (IllegalAccessException e) { + throw new RuntimeException("User class must have a no-arg constructor", e); + } catch (InvocationTargetException e) { + throw new RuntimeException("User class constructor throws exception", e); + } + return result; + + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java new file mode 100644 index 0000000..b665d1a --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.validation; + +import lombok.extern.slf4j.Slf4j; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +@Slf4j +public class ConfigValidation { + + public static void validateConfig(Object config) { + for (Field field : config.getClass().getDeclaredFields()) { + Object value = null; + field.setAccessible(true); + try { + value = field.get(config); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + validateField(field, value); + } + validateClass(config); + } + + private static void validateClass(Object config) { + processAnnotations(config.getClass().getAnnotations(), config.getClass().getName(), config); + } + + private static void validateField(Field field, Object value) { + processAnnotations(field.getAnnotations(), field.getName(), value); + } + + private static void processAnnotations(Annotation[] annotations, String fieldName, Object value) { + try { + for (Annotation annotation : annotations) { + + String type = annotation.annotationType().getName(); + Class<?> validatorClass = null; + Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses(); + //check if annotation is one of our + for (Class<?> clazz : classes) { + if (clazz.getName().equals(type)) { + validatorClass = clazz; + break; + } + } + if (validatorClass != null) { + Object v = validatorClass.cast(annotation); + @SuppressWarnings("unchecked") + Class<Validator> clazz = (Class<Validator>) validatorClass + .getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v); + Validator o = null; + Map<String, Object> params = getParamsFromAnnotation(validatorClass, v); + //two constructor signatures used to initialize validators. + //One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor) + //If validator has a constructor that takes a Map as an argument call that constructor + if (hasConstructor(clazz, Map.class)) { + o = clazz.getConstructor(Map.class).newInstance(params); + } else { //If not call default constructor + o = clazz.newInstance(); + } + o.validateField(fieldName, value); + } + } + } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + private static Map<String, Object> getParamsFromAnnotation(Class<?> validatorClass, Object v) + throws InvocationTargetException, IllegalAccessException { + Map<String, Object> params = new HashMap<String, Object>(); + for (Method method : validatorClass.getDeclaredMethods()) { + + Object value = null; + try { + value = (Object) method.invoke(v); + } catch (IllegalArgumentException ex) { + value = null; + } + if (value != null) { + params.put(method.getName(), value); + } + } + return params; + } + + public static boolean hasConstructor(Class<?> clazz, Class<?> paramClass) { + Class<?>[] classes = { paramClass }; + try { + clazz.getConstructor(classes); + } catch (NoSuchMethodException e) { + return false; + } + return true; + } +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java new file mode 100644 index 0000000..f08cbba --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.validation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +public class ConfigValidationAnnotations { + + /** + * Validates on object is not null + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface NotNull { + Class<?> validatorClass() default ValidatorImpls.NotNullValidator.class; + } + + /** + * Checks if a number is positive and whether zero inclusive Validator with fields: validatorClass, includeZero + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isPositiveNumber { + Class<?> validatorClass() default ValidatorImpls.PositiveNumberValidator.class; + + boolean includeZero() default false; + } + + + /** + * Checks if resources specified are valid + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isValidResources { + + Class<?> validatorClass() default ValidatorImpls.ResourcesValidator.class; + } + + /** + * validates each entry in a list is of a certain type + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isListEntryType { + Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class; + + Class<?> type(); + } + + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isStringList { + Class<?> validatorClass() default ValidatorImpls.ListEntryTypeValidator.class; + + Class<?> type() default String.class; + } + + /** + * Validates each entry in a list with a list of validators Validators with fields: validatorClass and entryValidatorClass + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isListEntryCustom { + Class<?> validatorClass() default ValidatorImpls.ListEntryCustomValidator.class; + + Class<?>[] entryValidatorClasses(); + } + + + /** + * Validates the type of each key and value in a map Validator with fields: validatorClass, keyValidatorClass, valueValidatorClass + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isMapEntryType { + Class<?> validatorClass() default ValidatorImpls.MapEntryTypeValidator.class; + + Class<?> keyType(); + + Class<?> valueType(); + } + + /** + * Checks if class name is assignable to the provided class/interfaces + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isImplementationOfClass { + Class<?> validatorClass() default ValidatorImpls.ImplementsClassValidator.class; + + Class<?> implementsClass(); + } + + /** + * Checks if class name is assignable to ONE of the provided list class/interfaces + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isImplementationOfClasses { + Class<?> validatorClass() default ValidatorImpls.ImplementsClassesValidator.class; + + Class<?>[] implementsClasses(); + } + + /** + * Validates a each key and value in a Map with a list of validators Validator with fields: validatorClass, keyValidatorClasses, + * valueValidatorClasses + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isMapEntryCustom { + Class<?> validatorClass() default ValidatorImpls.MapEntryCustomValidator.class; + + Class<?>[] keyValidatorClasses(); + + Class<?>[] valueValidatorClasses(); + } + + /** + * checks if the topic name is valid + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isValidTopicName { + Class<?> validatorClass() default ValidatorImpls.TopicNameValidator.class; + } + + /** + * checks if window configs is valid + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.FIELD) + public @interface isValidWindowConfig { + Class<?> validatorClass() default ValidatorImpls.WindowConfigValidator.class; + } + + /** + * checks function config as a whole to make sure all fields are valid + */ + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.TYPE}) + public @interface isValidFunctionConfig { + Class<?> validatorClass() default ValidatorImpls.FunctionConfigValidator.class; + } + + /** + * Field names for annotations + */ + public static class ValidatorParams { + static final String VALIDATOR_CLASS = "validatorClass"; + static final String TYPE = "type"; + static final String BASE_TYPE = "baseType"; + static final String ENTRY_VALIDATOR_CLASSES = "entryValidatorClasses"; + static final String KEY_VALIDATOR_CLASSES = "keyValidatorClasses"; + static final String VALUE_VALIDATOR_CLASSES = "valueValidatorClasses"; + static final String KEY_TYPE = "keyType"; + static final String VALUE_TYPE = "valueType"; + static final String INCLUDE_ZERO = "includeZero"; + static final String ACCEPTED_VALUES = "acceptedValues"; + static final String IMPLEMENTS_CLASS = "implementsClass"; + static final String IMPLEMENTS_CLASSES = "implementsClasses"; + } +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java new file mode 100644 index 0000000..f81cf06 --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.validation; + +import java.util.Map; + +public class ConfigValidationUtils { + /** + * Returns a new NestableFieldValidator for a given class. + * + * @param cls the Class the field should be a type of + * @param notNull whether or not a value of null is valid + * @return a NestableFieldValidator for that class + */ + public static NestableFieldValidator fv(final Class cls, final boolean notNull) { + return new NestableFieldValidator() { + @Override + public void validateField(String pd, String name, Object field) + throws IllegalArgumentException { + if (field == null) { + if (notNull) { + throw new IllegalArgumentException("Field " + name + " must not be null"); + } else { + return; + } + } + if (!cls.isInstance(field)) { + throw new IllegalArgumentException( + pd + name + " must be a " + cls.getName() + ". (" + field + ")"); + } + } + }; + } + + /** + * Returns a new NestableFieldValidator for a List of the given Class. + * + * @param cls the Class of elements composing the list + * @param notNull whether or not a value of null is valid + * @return a NestableFieldValidator for a list of the given class + */ + public static NestableFieldValidator listFv(Class cls, boolean notNull) { + return listFv(fv(cls, notNull), notNull); + } + + /** + * Returns a new NestableFieldValidator for a List where each item is validated by validator. + * + * @param validator used to validate each item in the list + * @param notNull whether or not a value of null is valid + * @return a NestableFieldValidator for a list with each item validated by a different validator. + */ + public static NestableFieldValidator listFv(final NestableFieldValidator validator, + final boolean notNull) { + return new NestableFieldValidator() { + @Override + public void validateField(String pd, String name, Object field) + throws IllegalArgumentException { + + if (field == null) { + if (notNull) { + throw new IllegalArgumentException("Field " + name + " must not be null"); + } else { + return; + } + } + if (field instanceof Iterable) { + for (Object e : (Iterable) field) { + validator.validateField(pd + "Each element of the list ", name, e); + } + return; + } + throw new IllegalArgumentException( + "Field " + name + " must be an Iterable but was " + + ((field == null) ? "null" : ("a " + field.getClass()))); + } + }; + } + + /** + * Returns a new NestableFieldValidator for a Map of key to val. + * + * @param key the Class of keys in the map + * @param val the Class of values in the map + * @param notNull whether or not a value of null is valid + * @return a NestableFieldValidator for a Map of key to val + */ + public static NestableFieldValidator mapFv(Class key, Class val, + boolean notNull) { + return mapFv(fv(key, false), fv(val, false), notNull); + } + + /** + * Returns a new NestableFieldValidator for a Map. + * + * @param key a validator for the keys in the map + * @param val a validator for the values in the map + * @param notNull whether or not a value of null is valid + * @return a NestableFieldValidator for a Map + */ + public static NestableFieldValidator mapFv(final NestableFieldValidator key, + final NestableFieldValidator val, final boolean notNull) { + return new NestableFieldValidator() { + @SuppressWarnings("unchecked") + @Override + public void validateField(String pd, String name, Object field) + throws IllegalArgumentException { + if (field == null) { + if (notNull) { + throw new IllegalArgumentException("Field " + name + " must not be null"); + } else { + return; + } + } + if (field instanceof Map) { + for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) field).entrySet()) { + key.validateField("Each key of the map ", name, entry.getKey()); + val.validateField("Each value in the map ", name, entry.getValue()); + } + return; + } + throw new IllegalArgumentException( + "Field " + name + " must be a Map"); + } + }; + } + + /** + * Declares methods for validating configuration values. + */ + public static interface FieldValidator { + /** + * Validates the given field. + * + * @param name the name of the field. + * @param field The field to be validated. + * @throws IllegalArgumentException if the field fails validation. + */ + public void validateField(String name, Object field) throws IllegalArgumentException; + } + + /** + * Declares a method for validating configuration values that is nestable. + */ + public static abstract class NestableFieldValidator implements FieldValidator { + @Override + public void validateField(String name, Object field) throws IllegalArgumentException { + validateField(null, name, field); + } + + /** + * Validates the given field. + * + * @param pd describes the parent wrapping this validator. + * @param name the name of the field. + * @param field The field to be validated. + * @throws IllegalArgumentException if the field fails validation. + */ + public abstract void validateField(String pd, String name, Object field) throws IllegalArgumentException; + } +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java new file mode 100644 index 0000000..5941048 --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.validation; + +import java.util.Map; + +public abstract class Validator { + public Validator(Map<String, Object> params) { + } + + public Validator() { + } + + public abstract void validateField(String name, Object o); +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java new file mode 100644 index 0000000..64c23f5 --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java @@ -0,0 +1,655 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.validation; + +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.jodah.typetools.TypeResolver; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.api.utils.DefaultSerDe; +import org.apache.pulsar.functions.utils.FunctionConfig; +import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.functions.utils.Resources; +import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.functions.utils.WindowConfig; + +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; + +@Slf4j +public class ValidatorImpls { + /** + * Validates a positive number. + */ + public static class PositiveNumberValidator extends Validator { + + private boolean includeZero; + + public PositiveNumberValidator() { + this.includeZero = false; + } + + public PositiveNumberValidator(Map<String, Object> params) { + this.includeZero = (boolean) params.get(ConfigValidationAnnotations.ValidatorParams.INCLUDE_ZERO); + } + + public static void validateField(String name, boolean includeZero, Object o) { + if (o == null) { + return; + } + if (o instanceof Number) { + if (includeZero) { + if (((Number) o).doubleValue() >= 0.0) { + return; + } + } else { + if (((Number) o).doubleValue() > 0.0) { + return; + } + } + } + throw new IllegalArgumentException(String.format("Field '%s' must be a Positive Number", name)); + } + + @Override + public void validateField(String name, Object o) { + validateField(name, this.includeZero, o); + } + } + + /** + * Validates if an object is not null. + */ + + public static class NotNullValidator extends Validator { + + @Override + public void validateField(String name, Object o) { + if (o == null) { + throw new IllegalArgumentException(String.format("Field '%s' cannot be null!", name)); + } + } + } + + @NoArgsConstructor + public static class ResourcesValidator extends Validator { + @Override + public void validateField(String name, Object o) { + if (o == null) { + throw new IllegalArgumentException(String.format("Field '%s' cannot be null!", name)); + } + + if (o instanceof Resources) { + Resources resources = (Resources) o; + Double cpu = resources.getCpu(); + Long ram = resources.getRam(); + Long disk = resources.getDisk(); + com.google.common.base.Preconditions.checkArgument(cpu == null || cpu > 0.0, + "The cpu allocation for the function must be positive"); + com.google.common.base.Preconditions.checkArgument(ram == null || ram > 0L, + "The ram allocation for the function must be positive"); + com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0L, + "The disk allocation for the function must be positive"); + } else { + throw new IllegalArgumentException(String.format("Field '%s' must be of Resource type!", name)); + } + } + } + + /** + * Validates each entry in a list. + */ + public static class ListEntryTypeValidator extends Validator { + + private Class<?> type; + + public ListEntryTypeValidator(Map<String, Object> params) { + this.type = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE); + } + + public static void validateField(String name, Class<?> type, Object o) { + ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.listFv(type, false); + validator.validateField(name, o); + } + + @Override + public void validateField(String name, Object o) { + validateField(name, this.type, o); + } + } + + /** + * validates each key and value in a map of a certain type. + */ + public static class MapEntryTypeValidator extends Validator { + + private Class<?> keyType; + private Class<?> valueType; + + public MapEntryTypeValidator(Map<String, Object> params) { + this.keyType = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_TYPE); + this.valueType = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_TYPE); + } + + public static void validateField(String name, Class<?> keyType, Class<?> valueType, Object o) { + ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(keyType, valueType, false); + validator.validateField(name, o); + } + + @Override + public void validateField(String name, Object o) { + validateField(name, this.keyType, this.valueType, o); + } + } + + public static class ImplementsClassValidator extends Validator { + + Class<?> classImplements; + + public ImplementsClassValidator(Map<String, Object> params) { + this.classImplements = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASS); + } + + public ImplementsClassValidator(Class<?> classImplements) { + this.classImplements = classImplements; + } + + @Override + public void validateField(String name, Object o) { + if (o == null) { + return; + } + SimpleTypeValidator.validateField(name, String.class, o); + String className = (String) o; + try { + ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); + Class<?> objectClass = clsLoader.loadClass(className); + if (!this.classImplements.isAssignableFrom(objectClass)) { + throw new IllegalArgumentException( + String.format("Field '%s' with value '%s' does not implement %s ", + name, o, this.classImplements.getName())); + } + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + } + + /** + * validates class implements one of these classes + */ + public static class ImplementsClassesValidator extends Validator { + + Class<?>[] classesImplements; + + public ImplementsClassesValidator(Map<String, Object> params) { + this.classesImplements = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASSES); + } + + @Override + public void validateField(String name, Object o) { + if (o == null) { + return; + } + SimpleTypeValidator.validateField(name, String.class, o); + String className = (String) o; + int count = 0; + for (Class<?> classImplements : classesImplements) { + Class<?> objectClass = null; + try { + objectClass = loadClass(className); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Cannot find/load class " + className); + } + + if (classImplements.isAssignableFrom(objectClass)) { + count++; + } + } + if (count == 0) { + throw new IllegalArgumentException( + String.format("Field '%s' with value '%s' does not implement any of these classes %s", + name, o, classesImplements)); + } + } + } + + @NoArgsConstructor + public static class SerdeValidator extends Validator { + + @Override + public void validateField(String name, Object o) { + new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, o); + } + } + + /** + * validates each key and each value against the respective arrays of validators. + */ + public static class MapEntryCustomValidator extends Validator { + + private Class<?>[] keyValidators; + private Class<?>[] valueValidators; + + public MapEntryCustomValidator(Map<String, Object> params) { + this.keyValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_VALIDATOR_CLASSES); + this.valueValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_VALIDATOR_CLASSES); + } + + @SuppressWarnings("unchecked") + public static void validateField(String name, Class<?>[] keyValidators, Class<?>[] valueValidators, Object o) + throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { + if (o == null) { + return; + } + //check if Map + SimpleTypeValidator.validateField(name, Map.class, o); + for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) o).entrySet()) { + for (Class<?> kv : keyValidators) { + Object keyValidator = kv.getConstructor().newInstance(); + if (keyValidator instanceof Validator) { + ((Validator) keyValidator).validateField(name + " Map key", entry.getKey()); + } else { + log.warn( + "validator: {} cannot be used in MapEntryCustomValidator to validate keys. Individual entry validators must " + + "a instance of Validator class", + kv.getName()); + } + } + for (Class<?> vv : valueValidators) { + Object valueValidator = vv.getConstructor().newInstance(); + if (valueValidator instanceof Validator) { + ((Validator) valueValidator).validateField(name + " Map value", entry.getValue()); + } else { + log.warn( + "validator: {} cannot be used in MapEntryCustomValidator to validate values. Individual entry validators " + + "must a instance of Validator class", + vv.getName()); + } + } + } + } + + @Override + public void validateField(String name, Object o) { + try { + validateField(name, this.keyValidators, this.valueValidators, o); + } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + } + + @NoArgsConstructor + public static class StringValidator extends Validator { + + private HashSet<String> acceptedValues = null; + + public StringValidator(Map<String, Object> params) { + + this.acceptedValues = + new HashSet<String>(Arrays.asList((String[]) params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES))); + + if (this.acceptedValues.isEmpty() || (this.acceptedValues.size() == 1 && this.acceptedValues.contains(""))) { + this.acceptedValues = null; + } + } + + @Override + public void validateField(String name, Object o) { + SimpleTypeValidator.validateField(name, String.class, o); + if (this.acceptedValues != null) { + if (!this.acceptedValues.contains((String) o)) { + throw new IllegalArgumentException( + "Field " + name + " is not an accepted value. Value: " + o + " Accepted values: " + this.acceptedValues); + } + } + } + } + @NoArgsConstructor + public static class FunctionConfigValidator extends Validator { + + private static void doJavaChecks(FunctionConfig functionConfig, String name) { + Class<?>[] typeArgs = Utils.getFunctionTypes(functionConfig); + + ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); + // Check if the Input serialization/deserialization class exists in jar or already loaded and that it + // implements SerDe class + functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> { + + + Class<?> serdeClass; + try { + serdeClass = loadClass(inputSerializer); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException( + String.format("The input serialization/deserialization class %s does not exist", + inputSerializer)); + } + + try { + new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, inputSerializer); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException( + String.format("The input serialization/deserialization class %s does not not implement %s", + + inputSerializer, SerDe.class.getCanonicalName())); + } + + if (inputSerializer.equals(DefaultSerDe.class.getName())) { + if (!DefaultSerDe.IsSupportedType(typeArgs[0])) { + throw new IllegalArgumentException("The default Serializer does not support type " + + typeArgs[0]); + } + } else { + SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader); + if (serDe == null) { + throw new IllegalArgumentException(String.format("The SerDe class %s does not exist", + inputSerializer)); + } + Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass()); + + // type inheritance information seems to be lost in generic type + // load the actual type class for verification + Class<?> fnInputClass; + Class<?> serdeInputClass; + try { + fnInputClass = Class.forName(typeArgs[0].getName(), true, clsLoader); + serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Failed to load type class", e); + } + + if (!fnInputClass.isAssignableFrom(serdeInputClass)) { + throw new IllegalArgumentException("Serializer type mismatch " + typeArgs[0] + " vs " + serDeTypes[0]); + } + } + }); + functionConfig.getInputs().forEach((topicName) -> { + if (!DefaultSerDe.IsSupportedType(typeArgs[0])) { + throw new RuntimeException("Default Serializer does not support type " + typeArgs[0]); + } + }); + if (!Void.class.equals(typeArgs[1])) { + if (functionConfig.getOutputSerdeClassName() == null + || functionConfig.getOutputSerdeClassName().isEmpty() + || functionConfig.getOutputSerdeClassName().equals(DefaultSerDe.class.getName())) { + if (!DefaultSerDe.IsSupportedType(typeArgs[1])) { + throw new RuntimeException("Default Serializer does not support type " + typeArgs[1]); + } + } else { + SerDe serDe = (SerDe) Reflections.createInstance(functionConfig.getOutputSerdeClassName(), + clsLoader); + if (serDe == null) { + throw new IllegalArgumentException(String.format("SerDe class %s does not exist", + functionConfig.getOutputSerdeClassName())); + } + Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass()); + + // type inheritance information seems to be lost in generic type + // load the actual type class for verification + Class<?> fnOutputClass; + Class<?> serdeOutputClass; + try { + fnOutputClass = Class.forName(typeArgs[1].getName(), true, clsLoader); + serdeOutputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failed to load type class", e); + } + + if (!serdeOutputClass.isAssignableFrom(fnOutputClass)) { + throw new RuntimeException("Serializer type mismatch " + typeArgs[1] + " vs " + serDeTypes[0]); + } + } + } + } + + private static void doPythonChecks(FunctionConfig functionConfig, String name) { + if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { + throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python"); + } + + if (functionConfig.getWindowConfig() != null) { + throw new IllegalArgumentException("There is currently no support windowing in python"); + } + } + + private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException { + if (inputTopics.contains(outputTopic)) { + throw new IllegalArgumentException( + String.format("Output topic %s is also being used as an input topic (topics must be one or the other)", + outputTopic)); + } + } + + private static void doCommonChecks(FunctionConfig functionConfig) { + if (functionConfig.getInputs().isEmpty() && functionConfig.getCustomSerdeInputs().isEmpty()) { + throw new RuntimeException("No input topic(s) specified for the function"); + } + + // Ensure that topics aren't being used as both input and output + verifyNoTopicClash(functionConfig.getInputs(), functionConfig.getOutput()); + + if (functionConfig.getSubscriptionType() != null + && functionConfig.getSubscriptionType() != FunctionConfig.SubscriptionType.FAILOVER + && functionConfig.getProcessingGuarantees() != null + && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { + throw new IllegalArgumentException("Effectively-once processing semantics can only be achieved using a Failover subscription type"); + } + + WindowConfig windowConfig = functionConfig.getWindowConfig(); + if (windowConfig != null) { + // set auto ack to false since windowing framework is responsible + // for acking and not the function framework + if (functionConfig.isAutoAck() == true) { + throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality"); + } + functionConfig.setAutoAck(false); + } + } + + @Override + public void validateField(String name, Object o) { + FunctionConfig functionConfig = (FunctionConfig) o; + doCommonChecks(functionConfig); + if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { + doJavaChecks(functionConfig, name); + } else { + doPythonChecks(functionConfig, name); + } + } + } + + /** + * Validates each entry in a list against a list of custom Validators. Each validator in the list of validators must inherit or be an + * instance of Validator class + */ + public static class ListEntryCustomValidator extends Validator { + + private Class<?>[] entryValidators; + + public ListEntryCustomValidator(Map<String, Object> params) { + this.entryValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.ENTRY_VALIDATOR_CLASSES); + } + + public static void validateField(String name, Class<?>[] validators, Object o) + throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { + if (o == null) { + return; + } + //check if iterable + SimpleTypeValidator.validateField(name, Iterable.class, o); + for (Object entry : (Iterable<?>) o) { + for (Class<?> validator : validators) { + Object v = validator.getConstructor().newInstance(); + if (v instanceof Validator) { + ((Validator) v).validateField(name + " list entry", entry); + } else { + log.warn( + "validator: {} cannot be used in ListEntryCustomValidator. Individual entry validators must a instance of " + + "Validator class", + validator.getName()); + } + } + } + } + + @Override + public void validateField(String name, Object o) { + try { + validateField(name, this.entryValidators, o); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) { + throw new RuntimeException(e); + } + } + } + + @NoArgsConstructor + public static class TopicNameValidator extends Validator { + + @Override + public void validateField(String name, Object o) { + if (o == null) { + return; + } + new StringValidator().validateField(name, o); + String topic = (String) o; + if (!TopicName.isValid(topic)) { + throw new IllegalArgumentException( + String.format("The topic name %s is invalid for field '%s'", topic, name)); + } + } + } + + public static class WindowConfigValidator extends Validator{ + + public static void validateWindowConfig(WindowConfig windowConfig) { + if (windowConfig.getWindowLengthDurationMs() == null && windowConfig.getWindowLengthCount() == null) { + throw new IllegalArgumentException("Window length is not specified"); + } + + if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getWindowLengthCount() != null) { + throw new IllegalArgumentException( + "Window length for time and count are set! Please set one or the other."); + } + + if (windowConfig.getWindowLengthCount() != null) { + if (windowConfig.getWindowLengthCount() <= 0) { + throw new IllegalArgumentException( + "Window length must be positive [" + windowConfig.getWindowLengthCount() + "]"); + } + } + + if (windowConfig.getWindowLengthDurationMs() != null) { + if (windowConfig.getWindowLengthDurationMs() <= 0) { + throw new IllegalArgumentException( + "Window length must be positive [" + windowConfig.getWindowLengthDurationMs() + "]"); + } + } + + if (windowConfig.getSlidingIntervalCount() != null) { + if (windowConfig.getSlidingIntervalCount() <= 0) { + throw new IllegalArgumentException( + "Sliding interval must be positive [" + windowConfig.getSlidingIntervalCount() + "]"); + } + } + + if (windowConfig.getSlidingIntervalDurationMs() != null) { + if (windowConfig.getSlidingIntervalDurationMs() <= 0) { + throw new IllegalArgumentException( + "Sliding interval must be positive [" + windowConfig.getSlidingIntervalDurationMs() + "]"); + } + } + + if (windowConfig.getTimestampExtractorClassName() != null) { + if (windowConfig.getMaxLagMs() != null) { + if (windowConfig.getMaxLagMs() < 0) { + throw new IllegalArgumentException( + "Lag duration must be positive [" + windowConfig.getMaxLagMs() + "]"); + } + } + if (windowConfig.getWatermarkEmitIntervalMs() != null) { + if (windowConfig.getWatermarkEmitIntervalMs() <= 0) { + throw new IllegalArgumentException( + "Watermark interval must be positive [" + windowConfig.getWatermarkEmitIntervalMs() + "]"); + } + } + } + } + + @Override + public void validateField(String name, Object o) { + if (o == null) { + return; + } + if (!(o instanceof WindowConfig)) { + throw new IllegalArgumentException(String.format("Field '%s' must be of WindowConfig type!", name)); + } + WindowConfig windowConfig = (WindowConfig) o; + validateWindowConfig(windowConfig); + } + } + + /** + * Validates basic types. + */ + public static class SimpleTypeValidator extends Validator { + + private Class<?> type; + + public SimpleTypeValidator(Map<String, Object> params) { + this.type = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE); + } + + public static void validateField(String name, Class<?> type, Object o) { + if (o == null) { + return; + } + if (type.isInstance(o)) { + return; + } + throw new IllegalArgumentException( + "Field " + name + " must be of type " + type + ". Object: " + o + " actual type: " + o.getClass()); + } + + @Override + public void validateField(String name, Object o) { + validateField(name, this.type, o); + } + } + + private static Class<?> loadClass(String className) throws ClassNotFoundException { + Class<?> objectClass; + try { + objectClass = Class.forName(className); + } catch (ClassNotFoundException e) { + ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); + if (clsLoader != null) { + objectClass = clsLoader.loadClass(className); + } else { + throw e; + } + } + return objectClass; + } +} -- To stop receiving notification emails like this one, please contact jerryp...@apache.org.