I am developing a Java application which uses UDFs on Flink 1.14.
It uses PipelineOptions.JARS config to add jar files, containing UDF
classes, dynamically to the user classpath in the main method; However the
application fails to load UDF class from configured jar files at job launch
time with and crashes with ClassNotFoundException.

Is PipelineOptions.JARS the correct option to add files to classpath on Job
manager and all task managers?

Sample code snippet:

final Configuration configuration = new Configuration();
configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar"));
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
...
Class udfClass = Class.forName("demo.MyUDF", ...);
tableEnv.createTemporarySystemFunction("MyUDF", udfClass);
...

Error stack trace:
Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF
    at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582)
    at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886)
    at
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772)
    at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
    at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)
    at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
    at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
    at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
    at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
    at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692)
    at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714)
    at
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130)
    at
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116)
    at
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73)
    at
org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81)
    at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825)
    at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503)
    at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
    at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
    at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437)
    at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432)
    at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356)
    ...

Reply via email to