你好,可以贴下客户端的具体提交命令吗?
Best,
Zhanghao Chen
From: 799590...@qq.com.INVALID <799590...@qq.com.INVALID>
Sent: Tuesday, April 12, 2022 10:46
To: user-zh
Subject: flinksql执行时提示自定义UDF无法加载的
环境信息
flink-1.13.6_scala_2.11
java 1.8
使用的是standalonesession集群模式,node01为jobmanager node02和node03为taskmanager
UDF代码
package com.example.udf;
import org.apache.flink.table.functions.ScalarFunction;
public class SubStr extends ScalarFunction {
public String eval(String s, Integer start,Integer end) {
return s.substring(start,end);
}
}
udf的jar存储在hdfs上面,每次客户端提交sql都会从hdfs将udf的jar列表通过类加载器加载,并设置pipeline.jars值为hdfs的udf
jar路径列表,在执行下面的sql时报错
insert into output_2455_5070_model_1649729386269 select tablekeymd5(user_id) as
mm ,proctime(),MD5(CONCAT_WS(CAST(user_id AS STRING))) from (select distinct id
as id, user_id as user_id, status as status from (select id,user_id,status from
data_2455_5068_model) where status < '4')
2022-04-12 10:26:36
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user
class: com.example.udf.TableKeyMd5
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.example.udf.TableKeyMd5
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
类加载器代码:
public static void loadJar(URL jarUrl) {
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (!accessible) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
/**
* 如果已经存在factory,则加一个装饰器,将原来的factory和用来读取hdfs的factory都封装进去,按需使用
*
* @param fsUrlStreamHandlerFactory
* @throws Exception
*/
public static void registerFactory(final FsUrlStreamHandlerFactory
fsUrlStreamHandlerFactory)
throws Exception {
log.info("registerFactory : " +
fsUrlStreamHandlerFactory.getClass().getName());
final Field factoryField = URL.class.getDeclaredField("factory");
factoryField.setAccessible(true);
final Field lockField = URL.class.getDeclaredField("streamHandlerLock");
lockField.setAccessible(true);
synchronized (lockField.get(null)) {
final URLStreamHandlerFactory originalUrlStreamHandlerFactory =
(URLStreamHandlerFactory) factoryField.get(null);
factoryField.set(null, null);
URL.setURLStreamHandlerFactory(protocol -> {
if ("hdfs".equals(protocol)) {
return
fsUrlStreamHandlerFactory.createURLStreamHandler(protocol);
} else {
return
o