非常不建议你将非Flink binary的jar存放到yarn.provided.lib.dirs,因为这个下面的jar会以Yarn public
distributed cache的方式进行分发
并在NodeManager上缓存,共享给所有的application使用

你这个报错的根本原因是本地运行main的时候udf还是在hdfs上,所以报错在client端了

有两个办法修复:
1. 不要将udf放到hdfs上的provided lib dirs,除非你确实想将它共享给很多application
2.
使用application模式[1],这种情况用户的main是在JobManager端运行的,provided下面的jar已经都下载并且加入classpath了

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#application-mode

Best,
Yang

zhisheng <[email protected]> 于2020年12月25日周五 上午11:26写道:

> hi
>
> 使用 -Dyarn.provided.lib.dirs 试试
>
> Best
> zhisheng
>
> datayangl <[email protected]> 于2020年12月22日周二 下午4:56写道:
>
> >
> >
> > flink1.11 on yarn模式,我提前将flink
> > lib下的依赖及自定义函数jar上传到hdfs上,提交时使用yarn.provided.lib.dirs
> > 指定hdfs的依赖路径。原本设想程序中使用反射去寻找自定义函数的类并且实例化,但是提交时报错,程序并没有找到自定义函数的路径
> >
> > 提交命令:/usr/hdp/flink1.11/bin/flink run -m yarn-cluster -d -ynm udf-test
> -yD
> > yarn.provided.lib.dirs=hdfs://ip:8020/flink-yarn/jars -c
> > com.ly.common.udf.demo.FlinkUDFDemo  /data/bigdata/jars/udf-test.jar
> >
> > 相关信息如下:
> > 2020-12-22 08:41:11,157 INFO
> > org.apache.flink.yarn.cli.FlinkYarnSessionCli
> > [] - Dynamic Property set:
> > yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> > 2020-12-22 08:41:11,157 INFO
> > org.apache.flink.yarn.cli.FlinkYarnSessionCli
> > [] - Dynamic Property set:
> > yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> > -- class path:
> /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
> >
> > ------------------------------------------------------------
> >  The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: object com.ly.third.udf.flink.SortKey not found.
> >         at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> >         at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >         at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >         at
> >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> >         at
> > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> >         at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> >         at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> >         at java.security.AccessController.doPrivileged(Native Method)
> >         at javax.security.auth.Subject.doAs(Subject.java:422)
> >         at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> >         at
> >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >         at
> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > Caused by: scala.ScalaReflectionException: object
> > com.ly.third.udf.flink.SortKey not found.
> >         at
> > scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
> >         at
> > scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
> >         at
> >
> >
> com.ly.common.udf.reflect.RegisterFlinkFunction$.loadFlinkFunction(RegisterFlinkFunction.scala:14)
> >         at
> com.ly.common.udf.demo.FlinkUDFDemo$.main(FlinkUDFDemo.scala:27)
> >         at com.ly.common.udf.demo.FlinkUDFDemo.main(FlinkUDFDemo.scala)
> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >         at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >         at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >         at java.lang.reflect.Method.invoke(Method.java:498)
> >         at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> >         ... 11 more
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>

回复