I am developing a Java application which uses UDFs on Flink 1.14. It uses PipelineOptions.JARS config to add jar files, containing UDF classes, dynamically to the user classpath in the main method; However the application fails to load UDF class from configured jar files at job launch time with and crashes with ClassNotFoundException.
Is PipelineOptions.JARS the correct option to add files to classpath on Job manager and all task managers? Sample code snippet: final Configuration configuration = new Configuration(); configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar")); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv); ... Class udfClass = Class.forName("demo.MyUDF", ...); tableEnv.createTemporarySystemFunction("MyUDF", udfClass); ... Error stack trace: Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886) at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692) at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714) at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130) at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116) at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73) at org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356) ...