Re: flinksql执行时提示自定义UDF无法加载的

2022-04-11 文章 Zhanghao Chen
你好,可以贴下客户端的具体提交命令吗?

Best,
Zhanghao Chen

From: 799590...@qq.com.INVALID <799590...@qq.com.INVALID>
Sent: Tuesday, April 12, 2022 10:46
To: user-zh 
Subject: flinksql执行时提示自定义UDF无法加载的

环境信息

flink-1.13.6_scala_2.11
java 1.8

使用的是standalonesession集群模式,node01为jobmanager   node02和node03为taskmanager

UDF代码
package com.example.udf;

import org.apache.flink.table.functions.ScalarFunction;

public class SubStr extends ScalarFunction {

public String eval(String s, Integer start,Integer end) {
return s.substring(start,end);
}
}

udf的jar存储在hdfs上面,每次客户端提交sql都会从hdfs将udf的jar列表通过类加载器加载,并设置pipeline.jars值为hdfs的udf 
 jar路径列表,在执行下面的sql时报错

insert into output_2455_5070_model_1649729386269 select tablekeymd5(user_id) as 
mm ,proctime(),MD5(CONCAT_WS(CAST(user_id AS STRING))) from (select distinct id 
as id, user_id as user_id, status as status from (select id,user_id,status from 
data_2455_5068_model) where status < '4')

2022-04-12 10:26:36
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: com.example.udf.TableKeyMd5
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:186)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.example.udf.TableKeyMd5
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)

类加载器代码:
public static void loadJar(URL jarUrl) {
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL", 
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (!accessible) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader) 
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
/**
 * 如果已经存在factory,则加一个装饰器,将原来的factory和用来读取hdfs的factory都封装进去,按需使用
 *
 * @param fsUrlStreamHandlerFactory
 * @throws Exception
 */
public static void registerFactory(final FsUrlStreamHandlerFactory 
fsUrlStreamHandlerFactory)
throws Exception {
log.info("registerFactory : " + 
fsUrlStreamHandlerFactory.getClass().getName());
final Field factoryField = URL.class.getDeclaredField("factory");
factoryField.setAccessible(true);
final Field lockField = URL.class.getDeclaredField("streamHandlerLock");
lockField.setAccessible(true);
synchronized (lockField.get(null)) {
final URLStreamHandlerFactory originalUrlStreamHandlerFactory = 
(URLStreamHandlerFactory) factoryField.get(null);
factoryField.set(null, null);
URL.setURLStreamHandlerFactory(protocol -> {
if ("hdfs".equals(protocol)) {
return 
fsUrlStreamHandlerFactory.createURLStreamHandler(protocol);
} else {
return 
o

flinksql执行时提示自定义UDF无法加载的

2022-04-11 文章 799590...@qq.com.INVALID
环境信息

flink-1.13.6_scala_2.11
java 1.8

使用的是standalonesession集群模式,node01为jobmanager   node02和node03为taskmanager

UDF代码
package com.example.udf;

import org.apache.flink.table.functions.ScalarFunction;

public class SubStr extends ScalarFunction {

public String eval(String s, Integer start,Integer end) {
return s.substring(start,end);
}
}

udf的jar存储在hdfs上面,每次客户端提交sql都会从hdfs将udf的jar列表通过类加载器加载,并设置pipeline.jars值为hdfs的udf 
 jar路径列表,在执行下面的sql时报错

insert into output_2455_5070_model_1649729386269 select tablekeymd5(user_id) as 
mm ,proctime(),MD5(CONCAT_WS(CAST(user_id AS STRING))) from (select distinct id 
as id, user_id as user_id, status as status from (select id,user_id,status from 
data_2455_5068_model) where status < '4')

2022-04-12 10:26:36
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: com.example.udf.TableKeyMd5
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:186)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.example.udf.TableKeyMd5
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)

类加载器代码:
public static void loadJar(URL jarUrl) {
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL", 
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (!accessible) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader) 
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
/**
 * 如果已经存在factory,则加一个装饰器,将原来的factory和用来读取hdfs的factory都封装进去,按需使用
 *
 * @param fsUrlStreamHandlerFactory
 * @throws Exception
 */
public static void registerFactory(final FsUrlStreamHandlerFactory 
fsUrlStreamHandlerFactory)
throws Exception {
log.info("registerFactory : " + 
fsUrlStreamHandlerFactory.getClass().getName());
final Field factoryField = URL.class.getDeclaredField("factory");
factoryField.setAccessible(true);
final Field lockField = URL.class.getDeclaredField("streamHandlerLock");
lockField.setAccessible(true);
synchronized (lockField.get(null)) {
final URLStreamHandlerFactory originalUrlStreamHandlerFactory = 
(URLStreamHandlerFactory) factoryField.get(null);
factoryField.set(null, null);
URL.setURLStreamHandlerFactory(protocol -> {
if ("hdfs".equals(protocol)) {
return 
fsUrlStreamHandlerFactory.createURLStreamHandler(protocol);
} else {
return 
originalUrlStreamHandlerFactory.createURLStreamHandler(protocol);
}
});
}
}



799590...@qq.com