使用自定义函数报入参填写字段名报Cannot load user class: com.example.udf.SubStr,填写字符串正常
环境信息 flink-1.13.6_scala_2.11 java 1.8 使用的是standalonesession集群模式,node01为jobmanager node02和node03为taskmanager 自定义函数的代码 package com.example.udf; import org.apache.flink.table.functions.ScalarFunction; public class SubStr extends ScalarFunction { public String eval(String s, Integer start, Integer end) { return s.substring(start.intValue(), end.intValue()); } } 提交到集群的sql代码 [ "DROP TABLE IF EXISTS source_datagen", "CREATE TABLE source_datagen(f_sequence INT,f_random INT,f_random_str STRING,ts AS localtimestamp,WATERMARK FOR ts AS ts) WITH ('connector' = 'datagen','rows-per-second'='5','fields.f_sequence.kind'='sequence','fields.f_sequence.start'='1','fields.f_sequence.end'='1000','fields.f_random.min'='1','fields.f_random.max'='1000','fields.f_random_str.length'='10')", "DROP TABLE IF EXISTS print_sink", "CREATE TABLE print_sink(id STRING,user_id STRING,`status` STRING,`str` STRING) WITH ('connector' = 'print')","INSERT INTO print_sink SELECT CAST(f_sequence AS STRING) AS id, CAST(f_random AS STRING) AS user_id, CAST(ts AS STRING) AS status,mysubstr(f_random_str,1,4) AS str FROM source_datagen" ] controller的业务逻辑为 public String executeDefaultSql(String sql) throws Exception { log.info(sql); StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host,port); env.setStateBackend(new HashMapStateBackend()); env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(6); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); env.getCheckpointConfig().setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION ); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings); List jars = new ArrayList<>(); jars.add("hdfs:///xxx/file/function/flink/d78721345f45422da269fa0411127eda0453523812.jar"); try { for (String jar : jars) { log.info(jar); EnvUtil.loadJar(URLUtil.url(jar)); } tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars); tableEnv.executeSql("CREATE FUNCTION mysubstr AS 'com.example.udf.SubStr' LANGUAGE JAVA").print(); log.info("完成加载hdfs上的udf"); }catch (Exception e){ e.printStackTrace(); } List list = JSON.parseArray(sql, String.class); TableResult result = null; for (String s : list) { result = tableEnv.executeSql(s); } String jobId = ""; log.info(result.getResultKind().name()); if (result.getJobClient().isPresent()) { log.info(JSON.toJSONString(result.getJobClient().get().getJobStatus())); jobId = result.getJobClient().get().getJobID().toString(); log.info("jobId:"+jobId); }else{ result.print(); } return jobId; } 报错信息为 2022-04-21 10:12:37 org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.example.udf.SubStr ClassLoader info: URL ClassLoader: Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.example.udf.SubStr at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) at java.lang.ClassLoader.loadCla
Re: flink run 提交任务到yarn 报Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
Hi! -C 要求提供的路径必须在集群的所有节点都能访问。如果提供的是 file://,那么集群的所有节点的这个路径下都要有对应文件。可以把文件放在 hdfs 上然后 -C 指定 hdfs:// 试试。 Wayne <1...@163.com> 于2021年8月30日周一 下午3:13写道: > 我的执行命令是 > flink run -m yarn-cluster -yd -yjm 1024m -ytm 1024m -ys 1 -ynm xxx -C > file:///xxx/flink-connector-kafka_2.12-1.12.2.jar -C > file:///xxx/flink-sql-avro-1.12.2.jar ... > 提交到生产集群上提示 > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load > user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer > ClassLoader info: URL ClassLoader: > Class not resolvable through given classloader. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:331) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:145) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:517) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > ~[?:1.8.0_292] > at java.lang.ClassLoader.loadClass(ClassLoader.java:418) > ~[?:1.8.0_292] > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at java.lang.ClassLoader.loadClass(ClassLoader.java:351) > ~[?:1.8.0_292] > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at java.lang.Class.forName0(Native Method) ~[?:1.8.0_292] > at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_292] > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) > ~[?:1.8.0_292] > at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) > ~[?:1.8.0_292] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.util.I
flink run 提交任务到yarn 报Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
我的执行命令是 flink run -m yarn-cluster -yd -yjm 1024m -ytm 1024m -ys 1 -ynm xxx -C file:///xxx/flink-connector-kafka_2.12-1.12.2.jar -C file:///xxx/flink-sql-avro-1.12.2.jar ... 提交到生产集群上提示 org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer ClassLoader info: URL ClassLoader: Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:331) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:145) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:517) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_292] at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_292] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_292] at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Class.forName0(Native Method) ~[?:1.8.0_292] at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_292] at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986) ~[?:1.8.0_292] at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850) ~[?:1.8.0_292] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160) ~[?:1.8.0_292] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) ~[?:1.8.0_292] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) ~[?:1.8.0_292] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) ~[?:1.8.0_292] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) ~[?:1.8.0_292] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) ~[?:1.8.0_292] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) ~[?:1.8.0_292] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) ~[?:1.8.0_292] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) ~[?:1.8.0_292] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) ~[?:1.8.0_292] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) ~[?:1.8.0_292] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) ~[?:1.8.0_292] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:317) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 6 more 2 任务可以提交到yarn上,说明 -C 这个命令起作用了,但是为啥yarn上报错,是我这个命令写的不对么
Cannot load user class
Hi all, 我们对kafka connector flink 进行了扩展,flink-connector-kafka-base包中新增了类,在功能迁移到1.11.1中,但是sql-cli中测试运行时报了无法加载类的异常,1.10.1版本是ok的,是不是1.11版本对类加载做了什么改动? 求大佬解惑,谢谢 异常如下: Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.DelayFlatMapFunction ClassLoader info: URL ClassLoader: file: '/var/folders/kl/qps350ws2kvb88r5knrp5flmgn/T/blobStore-c78573c7-5ebc-4d87-82ca-ce2647c79b6e/job_4d0e3bb67ab668416d108636ac6b8510/blob_p-a6a403094205e2501dc8790c04f2d21533c7af83-09bfececb34428dbb8e6c2d2eef9c5c7' (valid JAR) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:155) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:834) ~[?:?] Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.DelayFlatMapFunction at java.net.URLClassLoader.findClass(URLClassLoader.java:471) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:588) ~[?:?] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.ClassLoader.loadClass(ClassLoader.java:521) ~[?:?] at java.lang.Class.forName0(Native Method) ~[?:?] at java.lang.Class.forName(Class.java:398) ~[?:?] at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1965) ~[?:?] at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1851) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2139) ~[?:?] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434) ~[?:?] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166) ~[?:?] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434) ~[?:?] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166) ~[?:?] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) ~[?:?] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482) ~[?:?] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440) ~[?:?] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.1.jar:1.11.1