This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ecec933  improving config validation (#1859)
ecec933 is described below

commit ecec9337094820c65fe5ecf128f8070a3538bddd
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Wed May 30 12:51:11 2018 -0700

    improving config validation (#1859)
    
    * improving config validation
    
    * removing unnecessary file
    
    * removing unnecessary log
    
    * fix bug
    
    * fix potential NPE
---
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  |   7 +-
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 303 ++--------
 .../windowing/WindowFunctionExecutor.java          |   5 +-
 .../pulsar/functions/windowing/WindowUtils.java    |  53 +-
 .../windowing/WindowFunctionExecutorTest.java      |   4 +-
 pulsar-functions/utils/pom.xml                     |   5 +
 .../pulsar/functions/utils/FunctionConfig.java     |  34 +-
 .../org/apache/pulsar/functions/utils/Utils.java   |  73 +++
 .../utils/validation/ConfigValidation.java         | 119 ++++
 .../validation/ConfigValidationAnnotations.java    | 183 ++++++
 .../utils/validation/ConfigValidationUtils.java    | 177 ++++++
 .../functions/utils/validation/Validator.java      |  31 +
 .../functions/utils/validation/ValidatorImpls.java | 655 +++++++++++++++++++++
 13 files changed, 1352 insertions(+), 297 deletions(-)

diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index de4ab40..1cc239d 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -86,7 +86,12 @@ public class CmdFunctionsTest {
     private Functions functions;
     private CmdFunctions cmd;
 
-    public class DummyFunction implements Function<String, String> {
+    public static class DummyFunction implements Function<String, String> {
+
+        public DummyFunction() {
+
+        }
+
         @Override
         public String process(String input, Context context) throws Exception {
             return null;
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index d565568..94e17ad 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -18,14 +18,8 @@
  */
 package org.apache.pulsar.admin.cli;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.isNull;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-
 import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,31 +29,12 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.net.MalformedURLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
+import net.jodah.typetools.TypeResolver;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.api.kv.result.KeyValue;
@@ -70,10 +45,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
@@ -87,10 +59,33 @@ import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
 import org.apache.pulsar.functions.windowing.WindowUtils;
 
-import net.jodah.typetools.TypeResolver;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.isNull;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions 
(lightweight, Lambda-style compute processes that work with Pulsar)")
@@ -228,7 +223,7 @@ public class CmdFunctions extends CmdBase {
         @Parameter(names = "--userConfig", description = "User-defined config 
key/values")
         protected String userConfigString;
         @Parameter(names = "--parallelism", description = "The function's 
parallelism factor (i.e. the number of function instances to run)")
-        protected String parallelism;
+        protected Integer parallelism;
         @Parameter(names = "--cpu", description = "The cpu in cores that need 
to be allocated per function instance(applicable only to docker runtime)")
         protected Double cpu;
         @Parameter(names = "--ram", description = "The ram in bytes that need 
to be allocated per function instance(applicable only to process/docker 
runtime)")
@@ -275,19 +270,14 @@ public class CmdFunctions extends CmdBase {
 
             if (null != inputs) {
                 List<String> inputTopics = Arrays.asList(inputs.split(","));
-                inputTopics.forEach(this::validateTopicName);
                 functionConfig.setInputs(inputTopics);
             }
             if (null != customSerdeInputString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
                 Map<String, String> customSerdeInputMap = new 
Gson().fromJson(customSerdeInputString, type);
-                customSerdeInputMap.forEach((topic, serde) -> {
-                    validateTopicName(topic);
-                });
                 functionConfig.setCustomSerdeInputs(customSerdeInputMap);
             }
             if (null != output) {
-                validateTopicName(output);
                 functionConfig.setOutput(output);
             }
             if (null != logTopic) {
@@ -320,37 +310,12 @@ public class CmdFunctions extends CmdBase {
                 functionConfig.setUserConfig(new HashMap<>());
             }
 
-            if (functionConfig.getInputs().isEmpty() && 
functionConfig.getCustomSerdeInputs().isEmpty()) {
-                throw new RuntimeException("No input topic(s) specified for 
the function");
-            }
-
-            // Ensure that topics aren't being used as both input and output
-            verifyNoTopicClash(functionConfig.getInputs(), 
functionConfig.getOutput());
-
-            if (parallelism == null) {
-                if (functionConfig.getParallelism() == 0) {
-                    functionConfig.setParallelism(1);
-                }
-            } else {
-                int num = Integer.parseInt(parallelism);
-                if (num <= 0) {
-                    throw new IllegalArgumentException("The parallelism factor 
(the number of instances) for the function must be positive");
-                }
-                functionConfig.setParallelism(num);
+            if (parallelism != null) {
+                functionConfig.setParallelism(parallelism);
             }
 
-            com.google.common.base.Preconditions.checkArgument(cpu == null || 
cpu > 0, "The cpu allocation for the function must be positive");
-            com.google.common.base.Preconditions.checkArgument(ram == null || 
ram > 0, "The ram allocation for the function must be positive");
-            com.google.common.base.Preconditions.checkArgument(disk == null || 
disk > 0, "The disk allocation for the function must be positive");
             functionConfig.setResources(new 
org.apache.pulsar.functions.utils.Resources(cpu, ram, disk));
 
-            if (functionConfig.getSubscriptionType() != null
-                    && functionConfig.getSubscriptionType() != 
FunctionConfig.SubscriptionType.FAILOVER
-                    && functionConfig.getProcessingGuarantees() != null
-                    && functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                throw new IllegalArgumentException("Effectively-once 
processing semantics can only be achieved using a Failover subscription type");
-            }
-
             // window configs
             WindowConfig windowConfig = functionConfig.getWindowConfig();
             if (null != windowLengthCount) {
@@ -377,15 +342,7 @@ public class CmdFunctions extends CmdBase {
                 }
                 
windowConfig.setSlidingIntervalDurationMs(slidingIntervalDurationMs);
             }
-            if (windowConfig != null) {
-                WindowUtils.validateAndSetDefaultsWindowConfig(windowConfig);
-                // set auto ack to false since windowing framework is 
responsible
-                // for acking and not the function framework
-                if (autoAck != null && autoAck == true) {
-                    throw new IllegalArgumentException("Cannot enable auto ack 
when using windowing functionality");
-                }
-                functionConfig.setAutoAck(false);
-            }
+
             functionConfig.setWindowConfig(windowConfig);
 
             if  (null != autoAck) {
@@ -394,188 +351,43 @@ public class CmdFunctions extends CmdBase {
                 functionConfig.setAutoAck(true);
             }
 
-            inferMissingArguments(functionConfig);
 
             if (null != jarFile) {
-                doJavaSubmitChecks(functionConfig);
                 functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
                 userCodeFile = jarFile;
             } else if (null != pyFile) {
-                doPythonSubmitChecks(functionConfig);
                 functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
                 userCodeFile = pyFile;
             } else {
                 throw new RuntimeException("Either a Java jar or a Python file 
needs to be specified for the function");
             }
-        }
 
-        private Class<?>[] getFunctionTypes(File file, FunctionConfig 
functionConfig) {
-            assertClassExistsInJar(file);
-
-            Object userClass = 
Reflections.createInstance(functionConfig.getClassName(), file);
-            Class<?>[] typeArgs;
-            // if window function
-            if (functionConfig.getWindowConfig() != null) {
-                java.util.function.Function function = 
(java.util.function.Function) userClass;
-                if (function == null) {
-                    throw new IllegalArgumentException(String.format("The Java 
util function class %s could not be instantiated from jar %s",
-                            functionConfig.getClassName(), jarFile));
-                }
-                typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, 
function.getClass());
-                if (!typeArgs[0].equals(Collection.class)) {
-                    throw new IllegalArgumentException("Window function must 
take a collection as input");
-                }
-                Type type = 
TypeResolver.resolveGenericType(java.util.function.Function.class, 
function.getClass());
-                Type collectionType = ((ParameterizedType) 
type).getActualTypeArguments()[0];
-                Type actualInputType = ((ParameterizedType) 
collectionType).getActualTypeArguments()[0];
-                typeArgs[0] = (Class<?>) actualInputType;
-            } else {
-                if (userClass instanceof Function) {
-                    Function pulsarFunction = (Function) userClass;
-                    if (pulsarFunction == null) {
-                        throw new IllegalArgumentException(String.format("The 
Pulsar function class %s could not be instantiated from jar %s",
-                                functionConfig.getClassName(), jarFile));
-                    }
-                    typeArgs = 
TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
-                } else {
-                    java.util.function.Function function = 
(java.util.function.Function) userClass;
-                    if (function == null) {
-                        throw new IllegalArgumentException(String.format("The 
Java util function class %s could not be instantiated from jar %s",
-                                functionConfig.getClassName(), jarFile));
-                    }
-                    typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, 
function.getClass());
-                }
-            }
+            // infer default vaues
+            inferMissingArguments(functionConfig);
 
-            return typeArgs;
+            // check if function configs are valid
+            validateFunctionConfigs(functionConfig);
         }
 
-        private void assertClassExistsInJar(File file) {
-            if (!Reflections.classExistsInJar(file, 
functionConfig.getClassName())) {
-                throw new IllegalArgumentException(String.format("Pulsar 
function class %s does not exist in jar %s",
-                        functionConfig.getClassName(), jarFile));
-            } else if (!Reflections.classInJarImplementsIface(file, 
functionConfig.getClassName(), Function.class)
-                    && !Reflections.classInJarImplementsIface(file, 
functionConfig.getClassName(), java.util.function.Function.class)) {
-                throw new IllegalArgumentException(String.format("The Pulsar 
function class %s in jar %s implements neither 
org.apache.pulsar.functions.api.Function nor java.util.function.Function",
-                        functionConfig.getClassName(), jarFile));
-            }
-        }
+        private void validateFunctionConfigs(FunctionConfig functionConfig) {
 
-        private void doJavaSubmitChecks(FunctionConfig functionConfig) {
-            if (isNull(functionConfig.getClassName())) {
-                throw new IllegalArgumentException("You supplied a jar file 
but no main class");
-            }
-
-            File file = new File(jarFile);
-            ClassLoader userJarLoader;
-            try {
-                userJarLoader = Reflections.loadJar(file);
-            } catch (MalformedURLException e) {
-                throw new RuntimeException("Failed to load user jar " + file, 
e);
-            }
-            Class<?>[] typeArgs = getFunctionTypes(file, functionConfig);
-            // Check if the Input serialization/deserialization class exists 
in jar or already loaded and that it
-            // implements SerDe class
-            functionConfig.getCustomSerdeInputs().forEach((topicName, 
inputSerializer) -> {
-                if (!Reflections.classExists(inputSerializer)
-                        && !Reflections.classExistsInJar(new File(jarFile), 
inputSerializer)) {
-                    throw new IllegalArgumentException(
-                            String.format("The input 
serialization/deserialization class %s does not exist",
-                                    inputSerializer));
-                } else if (Reflections.classExists(inputSerializer)) {
-                    if (!Reflections.classImplementsIface(inputSerializer, 
SerDe.class)) {
-                        throw new IllegalArgumentException(String.format("The 
input serialization/deserialization class %s does not not implement %s",
-                                inputSerializer, 
SerDe.class.getCanonicalName()));
-                    }
-                } else if (Reflections.classExistsInJar(new File(jarFile), 
inputSerializer)) {
-                    if (!Reflections.classInJarImplementsIface(new 
File(jarFile), inputSerializer, SerDe.class)) {
-                        throw new IllegalArgumentException(String.format("The 
input serialization/deserialization class %s does not not implement %s",
-                                inputSerializer, 
SerDe.class.getCanonicalName()));
-                    }
-                }
-                if (inputSerializer.equals(DefaultSerDe.class.getName())) {
-                    if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
-                        throw new RuntimeException("The default Serializer 
does not support type " + typeArgs[0]);
-                    }
-                } else {
-                    SerDe serDe = (SerDe) 
Reflections.createInstance(inputSerializer, file);
-                    if (serDe == null) {
-                        throw new IllegalArgumentException(String.format("The 
SerDe class %s does not exist in jar %s",
-                                inputSerializer, jarFile));
-                    }
-                    Class<?>[] serDeTypes = 
TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
-                    // type inheritance information seems to be lost in 
generic type
-                    // load the actual type class for verification
-                    Class<?> fnInputClass;
-                    Class<?> serdeInputClass;
-                    try {
-                        fnInputClass = Class.forName(typeArgs[0].getName(), 
true, userJarLoader);
-                        serdeInputClass = 
Class.forName(serDeTypes[0].getName(), true, userJarLoader);
-                    } catch (ClassNotFoundException e) {
-                        throw new RuntimeException("Failed to load type 
class", e);
-                    }
-
-                    if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
-                        throw new RuntimeException("Serializer type mismatch " 
+ typeArgs[0] + " vs " + serDeTypes[0]);
-                    }
-                }
-            });
-            functionConfig.getInputs().forEach((topicName) -> {
-                if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
-                    throw new RuntimeException("Default Serializer does not 
support type " + typeArgs[0]);
-                }
-            });
-            if (!Void.class.equals(typeArgs[1])) {
-                if (functionConfig.getOutputSerdeClassName() == null
-                        || functionConfig.getOutputSerdeClassName().isEmpty()
-                        || 
functionConfig.getOutputSerdeClassName().equals(DefaultSerDe.class.getName())) {
-                    if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
-                        throw new RuntimeException("Default Serializer does 
not support type " + typeArgs[1]);
-                    }
-                } else {
-                    SerDe serDe = (SerDe) 
Reflections.createInstance(functionConfig.getOutputSerdeClassName(), file);
-                    if (serDe == null) {
-                        throw new 
IllegalArgumentException(String.format("SerDe class %s does not exist in jar 
%s",
-                                functionConfig.getOutputSerdeClassName(), 
jarFile));
-                    }
-                    Class<?>[] serDeTypes = 
TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
-                    // type inheritance information seems to be lost in 
generic type
-                    // load the actual type class for verification
-                    Class<?> fnOutputClass;
-                    Class<?> serdeOutputClass;
-                    try {
-                        fnOutputClass = Class.forName(typeArgs[1].getName(), 
true, userJarLoader);
-                        serdeOutputClass = 
Class.forName(serDeTypes[0].getName(), true, userJarLoader);
-                    } catch (ClassNotFoundException e) {
-                        throw new RuntimeException("Failed to load type 
class", e);
-                    }
-
-                    if (!serdeOutputClass.isAssignableFrom(fnOutputClass)) {
-                        throw new RuntimeException("Serializer type mismatch " 
+ typeArgs[1] + " vs " + serDeTypes[0]);
-                    }
+            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+                File file = new File(jarFile);
+                ClassLoader userJarLoader;
+                try {
+                    userJarLoader = Reflections.loadJar(file);
+                } catch (MalformedURLException e) {
+                    throw new RuntimeException("Failed to load user jar " + 
file, e);
                 }
-            }
-        }
-
-        private void doPythonSubmitChecks(FunctionConfig functionConfig) {
-            if (functionConfig.getClassName() == null) {
-                throw new IllegalArgumentException("You specified a Python 
file but no main class name");
-            }
-
-            if (functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                throw new RuntimeException("Effectively-once processing 
guarantees not yet supported in Python");
+                // make sure the function class loader is accessible 
thread-locally
+                Thread.currentThread().setContextClassLoader(userJarLoader);
             }
 
-            if (functionConfig.getWindowConfig() != null) {
-                throw new IllegalArgumentException("There is currently no 
support windowing in python");
-            }
-        }
-
-        private void validateTopicName(String topic) {
-            if (!TopicName.isValid(topic)) {
-                throw new IllegalArgumentException(String.format("The topic 
name %s is invalid", topic));
+            try {
+                // Need to load jar and set context class loader before calling
+                ConfigValidation.validateConfig(functionConfig);
+            } catch (Exception e) {
+                throw new ParameterException(e.getMessage());
             }
         }
 
@@ -592,6 +404,21 @@ public class CmdFunctions extends CmdBase {
             if (StringUtils.isEmpty(functionConfig.getOutput())) {
                 inferMissingOutput(functionConfig);
             }
+
+            if (functionConfig.getParallelism() == 0) {
+                functionConfig.setParallelism(1);
+            }
+
+            WindowConfig windowConfig = functionConfig.getWindowConfig();
+            if (windowConfig != null) {
+                WindowUtils.inferDefaultConfigs(windowConfig);
+                // set auto ack to false since windowing framework is 
responsible
+                // for acking and not the function framework
+                if (autoAck != null && autoAck == true) {
+                    throw new IllegalArgumentException("Cannot enable auto ack 
when using windowing functionality");
+                }
+                functionConfig.setAutoAck(false);
+            }
         }
 
         private void inferMissingFunctionName(FunctionConfig functionConfig) {
@@ -649,7 +476,7 @@ public class CmdFunctions extends CmdBase {
                 } catch (MalformedURLException e) {
                     throw new RuntimeException("Failed to load user jar " + 
file, e);
                 }
-                typeArgs = getFunctionTypes(file, functionConfig);
+                typeArgs = Utils.getFunctionTypes(functionConfig);
             }
 
             FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index a82a195..8f16eac 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
 import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
 import org.apache.pulsar.functions.windowing.evictors.TimeEvictionPolicy;
 import 
org.apache.pulsar.functions.windowing.evictors.WatermarkCountEvictionPolicy;
@@ -93,7 +94,9 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
                 (new 
Gson().toJson(context.getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY).get())),
                 WindowConfig.class);
 
-        WindowUtils.validateAndSetDefaultsWindowConfig(windowConfig);
+
+        WindowUtils.inferDefaultConfigs(windowConfig);
+        
ValidatorImpls.WindowConfigValidator.validateWindowConfig(windowConfig);
         return windowConfig;
     }
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java
index 6de61c2..73dda87 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java
@@ -25,44 +25,7 @@ public class WindowUtils {
         return String.format("%s/%s/%s", tenant, namespace, name);
     }
 
-    public static void validateAndSetDefaultsWindowConfig(WindowConfig 
windowConfig) {
-        if (windowConfig.getWindowLengthDurationMs() == null && 
windowConfig.getWindowLengthCount() == null) {
-            throw new IllegalArgumentException("Window length is not 
specified");
-        }
-
-        if (windowConfig.getWindowLengthDurationMs() != null && 
windowConfig.getWindowLengthCount() != null) {
-            throw new IllegalArgumentException(
-                    "Window length for time and count are set! Please set one 
or the other.");
-        }
-
-        if (windowConfig.getWindowLengthCount() != null) {
-            if (windowConfig.getWindowLengthCount() <= 0) {
-                throw new IllegalArgumentException(
-                        "Window length must be positive [" + 
windowConfig.getWindowLengthCount() + "]");
-            }
-        }
-
-        if (windowConfig.getWindowLengthDurationMs() != null) {
-            if (windowConfig.getWindowLengthDurationMs() <= 0) {
-                throw new IllegalArgumentException(
-                        "Window length must be positive [" + 
windowConfig.getWindowLengthDurationMs() + "]");
-            }
-        }
-
-        if (windowConfig.getSlidingIntervalCount() != null) {
-            if (windowConfig.getSlidingIntervalCount() <= 0) {
-                throw new IllegalArgumentException(
-                        "Sliding interval must be positive [" + 
windowConfig.getSlidingIntervalCount() + "]");
-            }
-        }
-
-        if (windowConfig.getSlidingIntervalDurationMs() != null) {
-            if (windowConfig.getSlidingIntervalDurationMs() <= 0) {
-                throw new IllegalArgumentException(
-                        "Sliding interval must be positive [" + 
windowConfig.getSlidingIntervalDurationMs() + "]");
-            }
-        }
-
+    public static void inferDefaultConfigs(WindowConfig windowConfig) {
         if (windowConfig.getWindowLengthDurationMs() != null && 
windowConfig.getSlidingIntervalDurationMs() == null) {
             
windowConfig.setSlidingIntervalDurationMs(windowConfig.getWindowLengthDurationMs());
         }
@@ -72,20 +35,10 @@ public class WindowUtils {
         }
 
         if (windowConfig.getTimestampExtractorClassName() != null) {
-            if (windowConfig.getMaxLagMs() != null) {
-                if (windowConfig.getMaxLagMs() <= 0) {
-                    throw new IllegalArgumentException(
-                            "Lag duration must be positive [" + 
windowConfig.getMaxLagMs() + "]");
-                }
-            } else {
+            if (windowConfig.getMaxLagMs() == null) {
                 
windowConfig.setMaxLagMs(WindowFunctionExecutor.DEFAULT_MAX_LAG_MS);
             }
-            if (windowConfig.getWatermarkEmitIntervalMs() != null) {
-                if (windowConfig.getWatermarkEmitIntervalMs() <= 0) {
-                    throw new IllegalArgumentException(
-                            "Watermark interval must be positive [" + 
windowConfig.getWatermarkEmitIntervalMs() + "]");
-                }
-            } else {
+            if (windowConfig.getWatermarkEmitIntervalMs() == null) {
                 
windowConfig.setWatermarkEmitIntervalMs(WindowFunctionExecutor.DEFAULT_WATERMARK_EVENT_INTERVAL_MS);
             }
         }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index 33565bc..c5d5d4a 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -537,8 +537,8 @@ public class WindowFunctionExecutorTest {
                 if (arg0 == null) {
                     
Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getMaxLagMs(),
                             new 
Long(testWindowedPulsarFunction.DEFAULT_MAX_LAG_MS));
-                } else if((Long) arg0 <= 0) {
-                    fail(String.format("Window lag cannot be zero or less -- 
lagTime: %s", arg0));
+                } else if((Long) arg0 < 0) {
+                    fail(String.format("Window lag cannot be less than zero -- 
lagTime: %s", arg0));
                 } else {
                     
Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getMaxLagMs().longValue(),
                             maxLagMs.longValue());
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index b278ed7..01710b1 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -82,6 +82,11 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>net.jodah</groupId>
+      <artifactId>typetools</artifactId>
+    </dependency>
+
   </dependencies>
 
 </project>
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index ffdda63..2940e32 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -23,10 +23,21 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.SerDe;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidFunctionConfig;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
+import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidWindowConfig;
+import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
 
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.Map;
 
 @Getter
@@ -34,6 +45,7 @@ import java.util.Map;
 @Data
 @EqualsAndHashCode
 @ToString
+@isValidFunctionConfig
 public class FunctionConfig {
 
     public enum ProcessingGuarantees {
@@ -61,25 +73,37 @@ public class FunctionConfig {
         PYTHON
     }
 
+
+    @NotNull
     private String tenant;
+    @NotNull
     private String namespace;
+    @NotNull
     private String name;
+    @NotNull
+    @isImplementationOfClasses(implementsClasses = {Function.class, 
java.util.function.Function.class})
     private String className;
-
+    @isListEntryCustom(entryValidatorClasses = 
{ValidatorImpls.TopicNameValidator.class})
     private Collection<String> inputs;
+    @isMapEntryCustom(keyValidatorClasses = { 
ValidatorImpls.TopicNameValidator.class },
+            valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })
     private Map<String, String> customSerdeInputs;
-
+    @isValidTopicName
     private String output;
+    @isImplementationOfClass(implementsClass = SerDe.class)
     private String outputSerdeClassName;
-
+    @isValidTopicName
     private String logTopic;
     private ProcessingGuarantees processingGuarantees;
     private Map<String, Object> userConfig;
     private SubscriptionType subscriptionType;
     private Runtime runtime;
     private boolean autoAck;
+    @isPositiveNumber
     private int parallelism;
+    @isValidResources
     private Resources resources;
     private String fqfn;
+    @isValidWindowConfig
     private WindowConfig windowConfig;
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 68ac86b..2963e57 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -21,17 +21,28 @@ package org.apache.pulsar.functions.utils;
 import com.google.protobuf.AbstractMessage.Builder;
 import com.google.protobuf.MessageOrBuilder;
 import com.google.protobuf.util.JsonFormat;
+
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.net.ServerSocket;
+import java.util.Collection;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.functions.api.Function;
+
+import net.jodah.typetools.TypeResolver;
 
 /**
  * Utils used for runtime.
  */
+@Slf4j
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class Utils {
 
@@ -76,4 +87,66 @@ public class Utils {
             throw new RuntimeException("No free port found", ex);
         }
     }
+
+    public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig) {
+
+        Object userClass = createInstance(functionConfig.getClassName(), 
Thread.currentThread().getContextClassLoader());
+
+        Class<?>[] typeArgs;
+        // if window function
+        if (functionConfig.getWindowConfig() != null) {
+            java.util.function.Function function = 
(java.util.function.Function) userClass;
+            if (function == null) {
+                throw new IllegalArgumentException(String.format("The Java 
util function class %s could not be instantiated",
+                        functionConfig.getClassName()));
+            }
+            typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, 
function.getClass());
+            if (!typeArgs[0].equals(Collection.class)) {
+                throw new IllegalArgumentException("Window function must take 
a collection as input");
+            }
+            Type type = 
TypeResolver.resolveGenericType(java.util.function.Function.class, 
function.getClass());
+            Type collectionType = ((ParameterizedType) 
type).getActualTypeArguments()[0];
+            Type actualInputType = ((ParameterizedType) 
collectionType).getActualTypeArguments()[0];
+            typeArgs[0] = (Class<?>) actualInputType;
+        } else {
+            if (userClass instanceof Function) {
+                Function pulsarFunction = (Function) userClass;
+                typeArgs = TypeResolver.resolveRawArguments(Function.class, 
pulsarFunction.getClass());
+            } else {
+                java.util.function.Function function = 
(java.util.function.Function) userClass;
+                typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, 
function.getClass());
+            }
+        }
+
+        return typeArgs;
+    }
+
+    public static Object createInstance(String userClassName, ClassLoader 
classLoader) {
+        Class<?> theCls;
+        try {
+            theCls = Class.forName(userClassName);
+        } catch (ClassNotFoundException cnfe) {
+            try {
+                theCls = Class.forName(userClassName, true, classLoader);
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException("User class must be in class path", 
cnfe);
+            }
+        }
+        Object result;
+        try {
+            Constructor<?> meth = theCls.getDeclaredConstructor();
+            meth.setAccessible(true);
+            result = meth.newInstance();
+        } catch (InstantiationException ie) {
+            throw new RuntimeException("User class must be concrete", ie);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException("User class doesn't have such method", 
e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException("User class must have a no-arg 
constructor", e);
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException("User class constructor throws 
exception", e);
+        }
+        return result;
+
+    }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
new file mode 100644
index 0000000..b665d1a
--- /dev/null
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidation.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class ConfigValidation {
+
+    public static void validateConfig(Object config) {
+        for (Field field : config.getClass().getDeclaredFields()) {
+            Object value = null;
+            field.setAccessible(true);
+            try {
+                value = field.get(config);
+            } catch (IllegalAccessException e) {
+               throw new RuntimeException(e);
+            }
+            validateField(field, value);
+        }
+        validateClass(config);
+    }
+
+    private static void validateClass(Object config) {
+        processAnnotations(config.getClass().getAnnotations(), 
config.getClass().getName(), config);
+    }
+
+    private static void validateField(Field field, Object value) {
+        processAnnotations(field.getAnnotations(), field.getName(), value);
+    }
+
+    private static void processAnnotations(Annotation[] annotations, String 
fieldName, Object value) {
+        try {
+            for (Annotation annotation : annotations) {
+
+                String type = annotation.annotationType().getName();
+                Class<?> validatorClass = null;
+                Class<?>[] classes = 
ConfigValidationAnnotations.class.getDeclaredClasses();
+                //check if annotation is one of our
+                for (Class<?> clazz : classes) {
+                    if (clazz.getName().equals(type)) {
+                        validatorClass = clazz;
+                        break;
+                    }
+                }
+                if (validatorClass != null) {
+                    Object v = validatorClass.cast(annotation);
+                    @SuppressWarnings("unchecked")
+                    Class<Validator> clazz = (Class<Validator>) validatorClass
+                            
.getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
+                    Validator o = null;
+                    Map<String, Object> params = 
getParamsFromAnnotation(validatorClass, v);
+                    //two constructor signatures used to initialize validators.
+                    //One constructor takes input a Map of arguments, the 
other doesn't take any arguments (default constructor)
+                    //If validator has a constructor that takes a Map as an 
argument call that constructor
+                    if (hasConstructor(clazz, Map.class)) {
+                        o = 
clazz.getConstructor(Map.class).newInstance(params);
+                    } else { //If not call default constructor
+                        o = clazz.newInstance();
+                    }
+                    o.validateField(fieldName, value);
+                }
+            }
+        } catch (NoSuchMethodException | IllegalAccessException | 
InstantiationException | InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Map<String, Object> getParamsFromAnnotation(Class<?> 
validatorClass, Object v)
+            throws InvocationTargetException, IllegalAccessException {
+        Map<String, Object> params = new HashMap<String, Object>();
+        for (Method method : validatorClass.getDeclaredMethods()) {
+
+            Object value = null;
+            try {
+                value = (Object) method.invoke(v);
+            } catch (IllegalArgumentException ex) {
+                value = null;
+            }
+            if (value != null) {
+                params.put(method.getName(), value);
+            }
+        }
+        return params;
+    }
+
+    public static boolean hasConstructor(Class<?> clazz, Class<?> paramClass) {
+        Class<?>[] classes = { paramClass };
+        try {
+            clazz.getConstructor(classes);
+        } catch (NoSuchMethodException e) {
+            return false;
+        }
+        return true;
+    }
+}
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
new file mode 100644
index 0000000..f08cbba
--- /dev/null
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+public class ConfigValidationAnnotations {
+
+    /**
+     * Validates on object is not null
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface NotNull {
+        Class<?> validatorClass() default 
ValidatorImpls.NotNullValidator.class;
+    }
+
+    /**
+     * Checks if a number is positive and whether zero inclusive Validator 
with fields: validatorClass, includeZero
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isPositiveNumber {
+        Class<?> validatorClass() default 
ValidatorImpls.PositiveNumberValidator.class;
+
+        boolean includeZero() default false;
+    }
+
+
+    /**
+     * Checks if resources specified are valid
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isValidResources {
+
+        Class<?> validatorClass() default 
ValidatorImpls.ResourcesValidator.class;
+    }
+
+    /**
+     * validates each entry in a list is of a certain type
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isListEntryType {
+        Class<?> validatorClass() default 
ValidatorImpls.ListEntryTypeValidator.class;
+
+        Class<?> type();
+    }
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isStringList {
+        Class<?> validatorClass() default 
ValidatorImpls.ListEntryTypeValidator.class;
+
+        Class<?> type() default String.class;
+    }
+
+    /**
+     * Validates each entry in a list with a list of validators Validators 
with fields: validatorClass and entryValidatorClass
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isListEntryCustom {
+        Class<?> validatorClass() default 
ValidatorImpls.ListEntryCustomValidator.class;
+
+        Class<?>[] entryValidatorClasses();
+    }
+
+
+    /**
+     * Validates the type of each key and value in a map Validator with 
fields: validatorClass, keyValidatorClass, valueValidatorClass
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isMapEntryType {
+        Class<?> validatorClass() default 
ValidatorImpls.MapEntryTypeValidator.class;
+
+        Class<?> keyType();
+
+        Class<?> valueType();
+    }
+
+    /**
+     * Checks if class name is assignable to the provided class/interfaces
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isImplementationOfClass {
+        Class<?> validatorClass() default 
ValidatorImpls.ImplementsClassValidator.class;
+
+        Class<?> implementsClass();
+    }
+
+    /**
+     * Checks if class name is assignable to ONE of the provided list 
class/interfaces
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isImplementationOfClasses {
+        Class<?> validatorClass() default 
ValidatorImpls.ImplementsClassesValidator.class;
+
+        Class<?>[] implementsClasses();
+    }
+
+    /**
+     * Validates a each key and value in a Map with a list of validators 
Validator with fields: validatorClass, keyValidatorClasses,
+     * valueValidatorClasses
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isMapEntryCustom {
+        Class<?> validatorClass() default 
ValidatorImpls.MapEntryCustomValidator.class;
+
+        Class<?>[] keyValidatorClasses();
+
+        Class<?>[] valueValidatorClasses();
+    }
+
+    /**
+     * checks if the topic name is valid
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isValidTopicName {
+        Class<?> validatorClass() default 
ValidatorImpls.TopicNameValidator.class;
+    }
+
+    /**
+     * checks if window configs is valid
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isValidWindowConfig {
+        Class<?> validatorClass() default 
ValidatorImpls.WindowConfigValidator.class;
+    }
+
+    /**
+     * checks function config as a whole to make sure all fields are valid
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target({ElementType.TYPE})
+    public @interface isValidFunctionConfig {
+        Class<?> validatorClass() default 
ValidatorImpls.FunctionConfigValidator.class;
+    }
+
+    /**
+     * Field names for annotations
+     */
+    public static class ValidatorParams {
+        static final String VALIDATOR_CLASS = "validatorClass";
+        static final String TYPE = "type";
+        static final String BASE_TYPE = "baseType";
+        static final String ENTRY_VALIDATOR_CLASSES = "entryValidatorClasses";
+        static final String KEY_VALIDATOR_CLASSES = "keyValidatorClasses";
+        static final String VALUE_VALIDATOR_CLASSES = "valueValidatorClasses";
+        static final String KEY_TYPE = "keyType";
+        static final String VALUE_TYPE = "valueType";
+        static final String INCLUDE_ZERO = "includeZero";
+        static final String ACCEPTED_VALUES = "acceptedValues";
+        static final String IMPLEMENTS_CLASS = "implementsClass";
+        static final String IMPLEMENTS_CLASSES = "implementsClasses";
+    }
+}
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java
new file mode 100644
index 0000000..f81cf06
--- /dev/null
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationUtils.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import java.util.Map;
+
+public class ConfigValidationUtils {
+    /**
+     * Returns a new NestableFieldValidator for a given class.
+     *
+     * @param cls     the Class the field should be a type of
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for that class
+     */
+    public static NestableFieldValidator fv(final Class cls, final boolean 
notNull) {
+        return new NestableFieldValidator() {
+            @Override
+            public void validateField(String pd, String name, Object field)
+                throws IllegalArgumentException {
+                if (field == null) {
+                    if (notNull) {
+                        throw new IllegalArgumentException("Field " + name + " 
must not be null");
+                    } else {
+                        return;
+                    }
+                }
+                if (!cls.isInstance(field)) {
+                    throw new IllegalArgumentException(
+                        pd + name + " must be a " + cls.getName() + ". (" + 
field + ")");
+                }
+            }
+        };
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a List of the given Class.
+     *
+     * @param cls     the Class of elements composing the list
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for a list of the given class
+     */
+    public static NestableFieldValidator listFv(Class cls, boolean notNull) {
+        return listFv(fv(cls, notNull), notNull);
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a List where each item is 
validated by validator.
+     *
+     * @param validator used to validate each item in the list
+     * @param notNull   whether or not a value of null is valid
+     * @return a NestableFieldValidator for a list with each item validated by 
a different validator.
+     */
+    public static NestableFieldValidator listFv(final NestableFieldValidator 
validator,
+                                                final boolean notNull) {
+        return new NestableFieldValidator() {
+            @Override
+            public void validateField(String pd, String name, Object field)
+                throws IllegalArgumentException {
+
+                if (field == null) {
+                    if (notNull) {
+                        throw new IllegalArgumentException("Field " + name + " 
must not be null");
+                    } else {
+                        return;
+                    }
+                }
+                if (field instanceof Iterable) {
+                    for (Object e : (Iterable) field) {
+                        validator.validateField(pd + "Each element of the list 
", name, e);
+                    }
+                    return;
+                }
+                throw new IllegalArgumentException(
+                    "Field " + name + " must be an Iterable but was " +
+                    ((field == null) ? "null" : ("a " + field.getClass())));
+            }
+        };
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a Map of key to val.
+     *
+     * @param key     the Class of keys in the map
+     * @param val     the Class of values in the map
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for a Map of key to val
+     */
+    public static NestableFieldValidator mapFv(Class key, Class val,
+                                               boolean notNull) {
+        return mapFv(fv(key, false), fv(val, false), notNull);
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a Map.
+     *
+     * @param key     a validator for the keys in the map
+     * @param val     a validator for the values in the map
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for a Map
+     */
+    public static NestableFieldValidator mapFv(final NestableFieldValidator 
key,
+                                               final NestableFieldValidator 
val, final boolean notNull) {
+        return new NestableFieldValidator() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public void validateField(String pd, String name, Object field)
+                throws IllegalArgumentException {
+                if (field == null) {
+                    if (notNull) {
+                        throw new IllegalArgumentException("Field " + name + " 
must not be null");
+                    } else {
+                        return;
+                    }
+                }
+                if (field instanceof Map) {
+                    for (Map.Entry<Object, Object> entry : ((Map<Object, 
Object>) field).entrySet()) {
+                        key.validateField("Each key of the map ", name, 
entry.getKey());
+                        val.validateField("Each value in the map ", name, 
entry.getValue());
+                    }
+                    return;
+                }
+                throw new IllegalArgumentException(
+                    "Field " + name + " must be a Map");
+            }
+        };
+    }
+
+    /**
+     * Declares methods for validating configuration values.
+     */
+    public static interface FieldValidator {
+        /**
+         * Validates the given field.
+         *
+         * @param name  the name of the field.
+         * @param field The field to be validated.
+         * @throws IllegalArgumentException if the field fails validation.
+         */
+        public void validateField(String name, Object field) throws 
IllegalArgumentException;
+    }
+
+    /**
+     * Declares a method for validating configuration values that is nestable.
+     */
+    public static abstract class NestableFieldValidator implements 
FieldValidator {
+        @Override
+        public void validateField(String name, Object field) throws 
IllegalArgumentException {
+            validateField(null, name, field);
+        }
+
+        /**
+         * Validates the given field.
+         *
+         * @param pd    describes the parent wrapping this validator.
+         * @param name  the name of the field.
+         * @param field The field to be validated.
+         * @throws IllegalArgumentException if the field fails validation.
+         */
+        public abstract void validateField(String pd, String name, Object 
field) throws IllegalArgumentException;
+    }
+}
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java
new file mode 100644
index 0000000..5941048
--- /dev/null
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/Validator.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import java.util.Map;
+
+public abstract class Validator {
+    public Validator(Map<String, Object> params) {
+    }
+
+    public Validator() {
+    }
+
+    public abstract void validateField(String name, Object o);
+}
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
new file mode 100644
index 0000000..64c23f5
--- /dev/null
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -0,0 +1,655 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.validation;
+
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Resources;
+import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.WindowConfig;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+
+@Slf4j
+public class ValidatorImpls {
+    /**
+     * Validates a positive number.
+     */
+    public static class PositiveNumberValidator extends Validator {
+
+        private boolean includeZero;
+
+        public PositiveNumberValidator() {
+            this.includeZero = false;
+        }
+
+        public PositiveNumberValidator(Map<String, Object> params) {
+            this.includeZero = (boolean) 
params.get(ConfigValidationAnnotations.ValidatorParams.INCLUDE_ZERO);
+        }
+
+        public static void validateField(String name, boolean includeZero, 
Object o) {
+            if (o == null) {
+                return;
+            }
+            if (o instanceof Number) {
+                if (includeZero) {
+                    if (((Number) o).doubleValue() >= 0.0) {
+                        return;
+                    }
+                } else {
+                    if (((Number) o).doubleValue() > 0.0) {
+                        return;
+                    }
+                }
+            }
+            throw new IllegalArgumentException(String.format("Field '%s' must 
be a Positive Number", name));
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            validateField(name, this.includeZero, o);
+        }
+    }
+
+    /**
+     * Validates if an object is not null.
+     */
+
+    public static class NotNullValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                throw new IllegalArgumentException(String.format("Field '%s' 
cannot be null!", name));
+            }
+        }
+    }
+
+    @NoArgsConstructor
+    public static class ResourcesValidator extends Validator {
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+               throw new IllegalArgumentException(String.format("Field '%s' 
cannot be null!", name));
+            }
+
+            if (o instanceof Resources) {
+                Resources resources = (Resources) o;
+                Double cpu = resources.getCpu();
+                Long ram = resources.getRam();
+                Long disk = resources.getDisk();
+                com.google.common.base.Preconditions.checkArgument(cpu == null 
|| cpu > 0.0,
+                        "The cpu allocation for the function must be 
positive");
+                com.google.common.base.Preconditions.checkArgument(ram == null 
|| ram > 0L,
+                        "The ram allocation for the function must be 
positive");
+                com.google.common.base.Preconditions.checkArgument(disk == 
null || disk > 0L,
+                        "The disk allocation for the function must be 
positive");
+            } else {
+                throw new IllegalArgumentException(String.format("Field '%s' 
must be of Resource type!", name));
+            }
+        }
+    }
+
+    /**
+     * Validates each entry in a list.
+     */
+    public static class ListEntryTypeValidator extends Validator {
+
+        private Class<?> type;
+
+        public ListEntryTypeValidator(Map<String, Object> params) {
+            this.type = (Class<?>) 
params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
+        }
+
+        public static void validateField(String name, Class<?> type, Object o) 
{
+            ConfigValidationUtils.NestableFieldValidator validator = 
ConfigValidationUtils.listFv(type, false);
+            validator.validateField(name, o);
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            validateField(name, this.type, o);
+        }
+    }
+
+    /**
+     * validates each key and value in a map of a certain type.
+     */
+    public static class MapEntryTypeValidator extends Validator {
+
+        private Class<?> keyType;
+        private Class<?> valueType;
+
+        public MapEntryTypeValidator(Map<String, Object> params) {
+            this.keyType = (Class<?>) 
params.get(ConfigValidationAnnotations.ValidatorParams.KEY_TYPE);
+            this.valueType = (Class<?>) 
params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_TYPE);
+        }
+
+        public static void validateField(String name, Class<?> keyType, 
Class<?> valueType, Object o) {
+            ConfigValidationUtils.NestableFieldValidator validator = 
ConfigValidationUtils.mapFv(keyType, valueType, false);
+            validator.validateField(name, o);
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            validateField(name, this.keyType, this.valueType, o);
+        }
+    }
+
+    public static class ImplementsClassValidator extends Validator {
+
+        Class<?> classImplements;
+
+        public ImplementsClassValidator(Map<String, Object> params) {
+            this.classImplements = (Class<?>) 
params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASS);
+        }
+
+        public ImplementsClassValidator(Class<?> classImplements) {
+            this.classImplements = classImplements;
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            SimpleTypeValidator.validateField(name, String.class, o);
+            String className = (String) o;
+            try {
+                ClassLoader clsLoader = 
Thread.currentThread().getContextClassLoader();
+                Class<?> objectClass = clsLoader.loadClass(className);
+                if (!this.classImplements.isAssignableFrom(objectClass)) {
+                    throw new IllegalArgumentException(
+                            String.format("Field '%s' with value '%s' does not 
implement %s ",
+                                    name, o, this.classImplements.getName()));
+                }
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * validates class implements one of these classes
+     */
+    public static class ImplementsClassesValidator extends Validator {
+
+        Class<?>[] classesImplements;
+
+        public ImplementsClassesValidator(Map<String, Object> params) {
+            this.classesImplements = (Class<?>[]) 
params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASSES);
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            SimpleTypeValidator.validateField(name, String.class, o);
+            String className = (String) o;
+            int count = 0;
+            for (Class<?> classImplements : classesImplements) {
+                Class<?> objectClass = null;
+                try {
+                    objectClass = loadClass(className);
+                } catch (ClassNotFoundException e) {
+                    throw new IllegalArgumentException("Cannot find/load class 
" + className);
+                }
+
+                if (classImplements.isAssignableFrom(objectClass)) {
+                    count++;
+                }
+            }
+            if (count == 0) {
+                throw new IllegalArgumentException(
+                        String.format("Field '%s' with value '%s' does not 
implement any of these classes %s",
+                                name, o, classesImplements));
+            }
+        }
+    }
+
+    @NoArgsConstructor
+    public static class SerdeValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            new 
ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, o);
+        }
+    }
+
+    /**
+     * validates each key and each value against the respective arrays of 
validators.
+     */
+    public static class MapEntryCustomValidator extends Validator {
+
+        private Class<?>[] keyValidators;
+        private Class<?>[] valueValidators;
+
+        public MapEntryCustomValidator(Map<String, Object> params) {
+            this.keyValidators = (Class<?>[]) 
params.get(ConfigValidationAnnotations.ValidatorParams.KEY_VALIDATOR_CLASSES);
+            this.valueValidators = (Class<?>[]) 
params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_VALIDATOR_CLASSES);
+        }
+
+        @SuppressWarnings("unchecked")
+        public static void validateField(String name, Class<?>[] 
keyValidators, Class<?>[] valueValidators, Object o)
+                throws IllegalAccessException, InstantiationException, 
NoSuchMethodException, InvocationTargetException {
+            if (o == null) {
+                return;
+            }
+            //check if Map
+            SimpleTypeValidator.validateField(name, Map.class, o);
+            for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) 
o).entrySet()) {
+                for (Class<?> kv : keyValidators) {
+                    Object keyValidator = kv.getConstructor().newInstance();
+                    if (keyValidator instanceof Validator) {
+                        ((Validator) keyValidator).validateField(name + " Map 
key", entry.getKey());
+                    } else {
+                        log.warn(
+                                "validator: {} cannot be used in 
MapEntryCustomValidator to validate keys.  Individual entry validators must " +
+                                        "a instance of Validator class",
+                                kv.getName());
+                    }
+                }
+                for (Class<?> vv : valueValidators) {
+                    Object valueValidator = vv.getConstructor().newInstance();
+                    if (valueValidator instanceof Validator) {
+                        ((Validator) valueValidator).validateField(name + " 
Map value", entry.getValue());
+                    } else {
+                        log.warn(
+                                "validator: {} cannot be used in 
MapEntryCustomValidator to validate values.  Individual entry validators " +
+                                        "must a instance of Validator class",
+                                vv.getName());
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            try {
+                validateField(name, this.keyValidators, this.valueValidators, 
o);
+            } catch (IllegalAccessException | InstantiationException | 
NoSuchMethodException | InvocationTargetException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @NoArgsConstructor
+    public static class StringValidator extends Validator {
+
+        private HashSet<String> acceptedValues = null;
+
+        public StringValidator(Map<String, Object> params) {
+
+            this.acceptedValues =
+                    new HashSet<String>(Arrays.asList((String[]) 
params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES)));
+
+            if (this.acceptedValues.isEmpty() || (this.acceptedValues.size() 
== 1 && this.acceptedValues.contains(""))) {
+                this.acceptedValues = null;
+            }
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            SimpleTypeValidator.validateField(name, String.class, o);
+            if (this.acceptedValues != null) {
+                if (!this.acceptedValues.contains((String) o)) {
+                    throw new IllegalArgumentException(
+                            "Field " + name + " is not an accepted value. 
Value: " + o + " Accepted values: " + this.acceptedValues);
+                }
+            }
+        }
+    }
+    @NoArgsConstructor
+    public static class FunctionConfigValidator extends Validator {
+
+        private static void doJavaChecks(FunctionConfig functionConfig, String 
name) {
+            Class<?>[] typeArgs = Utils.getFunctionTypes(functionConfig);
+
+            ClassLoader clsLoader = 
Thread.currentThread().getContextClassLoader();
+            // Check if the Input serialization/deserialization class exists 
in jar or already loaded and that it
+            // implements SerDe class
+            functionConfig.getCustomSerdeInputs().forEach((topicName, 
inputSerializer) -> {
+
+
+                Class<?> serdeClass;
+                try {
+                    serdeClass = loadClass(inputSerializer);
+                } catch (ClassNotFoundException e) {
+                    throw new IllegalArgumentException(
+                            String.format("The input 
serialization/deserialization class %s does not exist",
+                                    inputSerializer));
+                }
+
+                try {
+                    new 
ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, 
inputSerializer);
+                } catch (IllegalArgumentException ex) {
+                    throw new IllegalArgumentException(
+                            String.format("The input 
serialization/deserialization class %s does not not implement %s",
+
+                                    inputSerializer, 
SerDe.class.getCanonicalName()));
+                }
+
+                if (inputSerializer.equals(DefaultSerDe.class.getName())) {
+                    if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
+                        throw new IllegalArgumentException("The default 
Serializer does not support type " +
+                                typeArgs[0]);
+                    }
+                } else {
+                    SerDe serDe = (SerDe) 
Reflections.createInstance(inputSerializer, clsLoader);
+                    if (serDe == null) {
+                        throw new IllegalArgumentException(String.format("The 
SerDe class %s does not exist",
+                                inputSerializer));
+                    }
+                    Class<?>[] serDeTypes = 
TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
+
+                    // type inheritance information seems to be lost in 
generic type
+                    // load the actual type class for verification
+                    Class<?> fnInputClass;
+                    Class<?> serdeInputClass;
+                    try {
+                        fnInputClass = Class.forName(typeArgs[0].getName(), 
true, clsLoader);
+                        serdeInputClass = 
Class.forName(serDeTypes[0].getName(), true, clsLoader);
+                    } catch (ClassNotFoundException e) {
+                        throw new IllegalArgumentException("Failed to load 
type class", e);
+                    }
+
+                    if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
+                        throw new IllegalArgumentException("Serializer type 
mismatch " + typeArgs[0] + " vs " + serDeTypes[0]);
+                    }
+                }
+            });
+            functionConfig.getInputs().forEach((topicName) -> {
+                if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
+                    throw new RuntimeException("Default Serializer does not 
support type " + typeArgs[0]);
+                }
+            });
+            if (!Void.class.equals(typeArgs[1])) {
+                if (functionConfig.getOutputSerdeClassName() == null
+                        || functionConfig.getOutputSerdeClassName().isEmpty()
+                        || 
functionConfig.getOutputSerdeClassName().equals(DefaultSerDe.class.getName())) {
+                    if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
+                        throw new RuntimeException("Default Serializer does 
not support type " + typeArgs[1]);
+                    }
+                } else {
+                    SerDe serDe = (SerDe) 
Reflections.createInstance(functionConfig.getOutputSerdeClassName(),
+                            clsLoader);
+                    if (serDe == null) {
+                        throw new 
IllegalArgumentException(String.format("SerDe class %s does not exist",
+                                functionConfig.getOutputSerdeClassName()));
+                    }
+                    Class<?>[] serDeTypes = 
TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
+
+                    // type inheritance information seems to be lost in 
generic type
+                    // load the actual type class for verification
+                    Class<?> fnOutputClass;
+                    Class<?> serdeOutputClass;
+                    try {
+                        fnOutputClass = Class.forName(typeArgs[1].getName(), 
true, clsLoader);
+                        serdeOutputClass = 
Class.forName(serDeTypes[0].getName(), true, clsLoader);
+                    } catch (ClassNotFoundException e) {
+                        throw new RuntimeException("Failed to load type 
class", e);
+                    }
+
+                    if (!serdeOutputClass.isAssignableFrom(fnOutputClass)) {
+                        throw new RuntimeException("Serializer type mismatch " 
+ typeArgs[1] + " vs " + serDeTypes[0]);
+                    }
+                }
+            }
+        }
+
+        private static void doPythonChecks(FunctionConfig functionConfig, 
String name) {
+            if (functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                throw new RuntimeException("Effectively-once processing 
guarantees not yet supported in Python");
+            }
+
+            if (functionConfig.getWindowConfig() != null) {
+                throw new IllegalArgumentException("There is currently no 
support windowing in python");
+            }
+        }
+
+        private static void verifyNoTopicClash(Collection<String> inputTopics, 
String outputTopic) throws IllegalArgumentException {
+            if (inputTopics.contains(outputTopic)) {
+                throw new IllegalArgumentException(
+                        String.format("Output topic %s is also being used as 
an input topic (topics must be one or the other)",
+                                outputTopic));
+            }
+        }
+
+        private static void doCommonChecks(FunctionConfig functionConfig) {
+            if (functionConfig.getInputs().isEmpty() && 
functionConfig.getCustomSerdeInputs().isEmpty()) {
+                throw new RuntimeException("No input topic(s) specified for 
the function");
+            }
+
+            // Ensure that topics aren't being used as both input and output
+            verifyNoTopicClash(functionConfig.getInputs(), 
functionConfig.getOutput());
+
+            if (functionConfig.getSubscriptionType() != null
+                    && functionConfig.getSubscriptionType() != 
FunctionConfig.SubscriptionType.FAILOVER
+                    && functionConfig.getProcessingGuarantees() != null
+                    && functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                throw new IllegalArgumentException("Effectively-once 
processing semantics can only be achieved using a Failover subscription type");
+            }
+
+            WindowConfig windowConfig = functionConfig.getWindowConfig();
+            if (windowConfig != null) {
+                // set auto ack to false since windowing framework is 
responsible
+                // for acking and not the function framework
+                if (functionConfig.isAutoAck() == true) {
+                    throw new IllegalArgumentException("Cannot enable auto ack 
when using windowing functionality");
+                }
+                functionConfig.setAutoAck(false);
+            }
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            FunctionConfig functionConfig = (FunctionConfig) o;
+            doCommonChecks(functionConfig);
+            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+                doJavaChecks(functionConfig, name);
+            } else {
+                doPythonChecks(functionConfig, name);
+            }
+        }
+    }
+
+    /**
+     * Validates each entry in a list against a list of custom Validators. 
Each validator in the list of validators must inherit or be an
+     * instance of Validator class
+     */
+    public static class ListEntryCustomValidator extends Validator {
+
+        private Class<?>[] entryValidators;
+
+        public ListEntryCustomValidator(Map<String, Object> params) {
+            this.entryValidators = (Class<?>[]) 
params.get(ConfigValidationAnnotations.ValidatorParams.ENTRY_VALIDATOR_CLASSES);
+        }
+
+        public static void validateField(String name, Class<?>[] validators, 
Object o)
+                throws IllegalAccessException, InstantiationException, 
NoSuchMethodException, InvocationTargetException {
+            if (o == null) {
+                return;
+            }
+            //check if iterable
+            SimpleTypeValidator.validateField(name, Iterable.class, o);
+            for (Object entry : (Iterable<?>) o) {
+                for (Class<?> validator : validators) {
+                    Object v = validator.getConstructor().newInstance();
+                    if (v instanceof Validator) {
+                        ((Validator) v).validateField(name + " list entry", 
entry);
+                    } else {
+                        log.warn(
+                                "validator: {} cannot be used in 
ListEntryCustomValidator.  Individual entry validators must a instance of " +
+                                        "Validator class",
+                                validator.getName());
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            try {
+                validateField(name, this.entryValidators, o);
+            } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @NoArgsConstructor
+    public static class TopicNameValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            new StringValidator().validateField(name, o);
+            String topic = (String) o;
+            if (!TopicName.isValid(topic)) {
+                throw new IllegalArgumentException(
+                        String.format("The topic name %s is invalid for field 
'%s'", topic, name));
+            }
+        }
+    }
+
+    public static class WindowConfigValidator extends Validator{
+
+        public static void validateWindowConfig(WindowConfig windowConfig) {
+            if (windowConfig.getWindowLengthDurationMs() == null && 
windowConfig.getWindowLengthCount() == null) {
+                throw new IllegalArgumentException("Window length is not 
specified");
+            }
+
+            if (windowConfig.getWindowLengthDurationMs() != null && 
windowConfig.getWindowLengthCount() != null) {
+                throw new IllegalArgumentException(
+                        "Window length for time and count are set! Please set 
one or the other.");
+            }
+
+            if (windowConfig.getWindowLengthCount() != null) {
+                if (windowConfig.getWindowLengthCount() <= 0) {
+                    throw new IllegalArgumentException(
+                            "Window length must be positive [" + 
windowConfig.getWindowLengthCount() + "]");
+                }
+            }
+
+            if (windowConfig.getWindowLengthDurationMs() != null) {
+                if (windowConfig.getWindowLengthDurationMs() <= 0) {
+                    throw new IllegalArgumentException(
+                            "Window length must be positive [" + 
windowConfig.getWindowLengthDurationMs() + "]");
+                }
+            }
+
+            if (windowConfig.getSlidingIntervalCount() != null) {
+                if (windowConfig.getSlidingIntervalCount() <= 0) {
+                    throw new IllegalArgumentException(
+                            "Sliding interval must be positive [" + 
windowConfig.getSlidingIntervalCount() + "]");
+                }
+            }
+
+            if (windowConfig.getSlidingIntervalDurationMs() != null) {
+                if (windowConfig.getSlidingIntervalDurationMs() <= 0) {
+                    throw new IllegalArgumentException(
+                            "Sliding interval must be positive [" + 
windowConfig.getSlidingIntervalDurationMs() + "]");
+                }
+            }
+
+            if (windowConfig.getTimestampExtractorClassName() != null) {
+                if (windowConfig.getMaxLagMs() != null) {
+                    if (windowConfig.getMaxLagMs() < 0) {
+                        throw new IllegalArgumentException(
+                                "Lag duration must be positive [" + 
windowConfig.getMaxLagMs() + "]");
+                    }
+                }
+                if (windowConfig.getWatermarkEmitIntervalMs() != null) {
+                    if (windowConfig.getWatermarkEmitIntervalMs() <= 0) {
+                        throw new IllegalArgumentException(
+                                "Watermark interval must be positive [" + 
windowConfig.getWatermarkEmitIntervalMs() + "]");
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            if (!(o instanceof WindowConfig)) {
+                throw new IllegalArgumentException(String.format("Field '%s' 
must be of WindowConfig type!", name));
+            }
+            WindowConfig windowConfig = (WindowConfig) o;
+            validateWindowConfig(windowConfig);
+        }
+    }
+
+    /**
+     * Validates basic types.
+     */
+    public static class SimpleTypeValidator extends Validator {
+
+        private Class<?> type;
+
+        public SimpleTypeValidator(Map<String, Object> params) {
+            this.type = (Class<?>) 
params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
+        }
+
+        public static void validateField(String name, Class<?> type, Object o) 
{
+            if (o == null) {
+                return;
+            }
+            if (type.isInstance(o)) {
+                return;
+            }
+            throw new IllegalArgumentException(
+                    "Field " + name + " must be of type " + type + ". Object: 
" + o + " actual type: " + o.getClass());
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            validateField(name, this.type, o);
+        }
+    }
+
+    private static Class<?> loadClass(String className) throws 
ClassNotFoundException {
+        Class<?> objectClass;
+        try {
+            objectClass = Class.forName(className);
+        } catch (ClassNotFoundException e) {
+            ClassLoader clsLoader = 
Thread.currentThread().getContextClassLoader();
+            if (clsLoader != null) {
+                objectClass = clsLoader.loadClass(className);
+            } else {
+                throw e;
+            }
+        }
+        return objectClass;
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
jerryp...@apache.org.

Reply via email to