你好,可以贴下客户端的具体提交命令吗? Best, Zhanghao Chen ________________________________ From: 799590...@qq.com.INVALID <799590...@qq.com.INVALID> Sent: Tuesday, April 12, 2022 10:46 To: user-zh <user-zh@flink.apache.org> Subject: flinksql执行时提示自定义UDF无法加载的
环境信息 flink-1.13.6_scala_2.11 java 1.8 使用的是standalonesession集群模式,node01为jobmanager node02和node03为taskmanager UDF代码 package com.example.udf; import org.apache.flink.table.functions.ScalarFunction; public class SubStr extends ScalarFunction { public String eval(String s, Integer start,Integer end) { return s.substring(start,end); } } udf的jar存储在hdfs上面,每次客户端提交sql都会从hdfs将udf的jar列表通过类加载器加载,并设置pipeline.jars值为hdfs的udf jar路径列表,在执行下面的sql时报错 insert into output_2455_5070_model_1649729386269 select tablekeymd5(user_id) as mm ,proctime(),MD5(CONCAT_WS(CAST(user_id AS STRING))) from (select distinct id as id, user_id as user_id, status as status from (select id,user_id,status from data_2455_5068_model) where status < '4') 2022-04-12 10:26:36 org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.example.udf.TableKeyMd5 ClassLoader info: URL ClassLoader: Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.example.udf.TableKeyMd5 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) 类加载器代码: public static void loadJar(URL jarUrl) { Method method = null; try { method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); } catch (NoSuchMethodException | SecurityException e1) { e1.printStackTrace(); } // 获取方法的访问权限 boolean accessible = method.isAccessible(); try { //修改访问权限为可写 if (!accessible) { method.setAccessible(true); } // 获取系统类加载器 URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); //jar路径加入到系统url路径里 method.invoke(classLoader, jarUrl); } catch (Exception e) { e.printStackTrace(); } finally { method.setAccessible(accessible); } } /** * 如果已经存在factory,则加一个装饰器,将原来的factory和用来读取hdfs的factory都封装进去,按需使用 * * @param fsUrlStreamHandlerFactory * @throws Exception */ public static void registerFactory(final FsUrlStreamHandlerFactory fsUrlStreamHandlerFactory) throws Exception { log.info("registerFactory : " + fsUrlStreamHandlerFactory.getClass().getName()); final Field factoryField = URL.class.getDeclaredField("factory"); factoryField.setAccessible(true); final Field lockField = URL.class.getDeclaredField("streamHandlerLock"); lockField.setAccessible(true); synchronized (lockField.get(null)) { final URLStreamHandlerFactory originalUrlStreamHandlerFactory = (URLStreamHandlerFactory) factoryField.get(null); factoryField.set(null, null); URL.setURLStreamHandlerFactory(protocol -> { if ("hdfs".equals(protocol)) { return fsUrlStreamHandlerFactory.createURLStreamHandler(protocol); } else { return originalUrlStreamHandlerFactory.createURLStreamHandler(protocol); } }); } } 799590...@qq.com