Hi 周涛, Mang的建议很好,封装拼接参数是比较常见的实现。如果你没有什么特殊需求的话,推荐考虑他的建议,之后Flink版本升级之类的也一般会方便一些。 如果你因为某些原因,要继续走你目前的方式的话,我看到你的代码和YARNApplicationITCase中的代码比较接近了,你可以注意下,ITCase中的代码是在本地运行的,并且通过类似YARNApplicationITCase#startYARNWithConfig这样的方法设置好了HADOOP和Flink相关的环境变量。 在实际作业中,最终在Server侧,YARN在AM运行Flink作业的命令类似这样: /bin/bash -c /usr/lib/jvm/java-1.8.0/bin/java -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b … 我觉得你可以结合startYARNWithConfig判断一下你是否有哪里没有设置好相应环境变量,导致AM中的classpath等环境变量不对,进而导致找不到class。
发件人: 周涛 <06160...@163.com> 日期: 星期五, 2022年4月8日 下午8:57 收件人: Mang Zhang <zhangma...@163.com> 抄送: user-zh@flink.apache.org <user-zh@flink.apache.org> 主题: Re:Re:yarn api 提交报错 非常感谢Mang Zhang的回复 AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类: Log Type: jobmanager.err Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 Log Length: 107 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint Log Type: jobmanager.out Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 Log Length: 0 Log Type: prelaunch.err Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 Log Length: 0 Log Type: prelaunch.out Log Upload Time: Fri Apr 08 09:24:01 +0800 2022 Log Length: 70 Setting up env variables Setting up job resources Launching container 我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar 提交到yarn后,启动jm时抛错。一直未找到原因 另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务 在 2022-04-08 17:57:48,"Mang Zhang" <zhangma...@163.com> 写道: 这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log 比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079 另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN 比如flink lib下的jar,使用 YarnLocalResourceDescriptor 注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制; 建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了 祝好 -- Best regards, Mang Zhang 在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道: > > > > > > >hi, > 我在测试使用java api提交flink任务时,遇到了一些问题,需要请教: > flink版本1.14.4 > Hadoop版本:3.0.0-cdh6.2.1 > application模式,使用命令提交正常运行,api提交失败 > 提交失败,yarn日志: > LogType:jobmanager.err > LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022 > LogLength:107 > LogContents: > Error: Could not find or load main class > org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint > End of LogType:jobmanager.err >以下是代码: > > > System.setProperty("HADOOP_USER_NAME", "hdfs"); >//flink的本地配置目录,为了得到flink的配置 >String configurationDirectory = >"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\"; > >//存放flink集群相关的jar包目录 >String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/"; >//用户jar >String userJarPath = >"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar"; > String flinkDistJar = > "hdfs://nameservice1/flink/jar/libs/flink-dist.jar"; > > YarnClientService yarnClientService = new YarnClientService(); > //yarnclient创建 > YarnClient yarnClient = yarnClientService.getYarnClient(); > yarnClient.start(); > >// 设置日志的,没有的话看不到日志 >YarnClusterInformationRetriever clusterInformationRetriever = >YarnClientYarnClusterInformationRetriever > .create(yarnClient); > >//获取flink的配置 >Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration( > configurationDirectory); > > > flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(), >"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\"); > > flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, > true); > > flinkConfiguration.set(PipelineOptions.JARS, > Collections.singletonList(userJarPath)); > > Path remoteLib = new Path(flinkLibs); > flinkConfiguration.set( > YarnConfigOptions.PROVIDED_LIB_DIRS, > Collections.singletonList(remoteLib.toString())); > > flinkConfiguration.set( > YarnConfigOptions.FLINK_DIST_JAR, > flinkDistJar); > >// 设置为application模式 >flinkConfiguration.set( > DeploymentOptions.TARGET, > YarnDeploymentTarget.APPLICATION.getName()); > >// yarn application name >flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, >"flink-application"); > > YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, > configurationDirectory); > >// 设置用户jar的参数和主类 >ApplicationConfiguration appConfig = new ApplicationConfiguration(new >String[]{}, "com.zt.FlinkTest1"); > > >final int jobManagerMemoryMB = > > JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( > flinkConfiguration, > JobManagerOptions.TOTAL_PROCESS_MEMORY) > .getTotalProcessMemorySize() > .getMebiBytes(); >final int taskManagerMemoryMB = > TaskExecutorProcessUtils.processSpecFromConfig( > TaskExecutorProcessUtils > > .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption( > flinkConfiguration, > > TaskManagerOptions.TOTAL_PROCESS_MEMORY)) > .getTotalProcessMemorySize() > .getMebiBytes(); > ClusterSpecification clusterSpecification = new > ClusterSpecification.ClusterSpecificationBuilder() > .setMasterMemoryMB(jobManagerMemoryMB) > .setTaskManagerMemoryMB(taskManagerMemoryMB) > > .setSlotsPerTaskManager(flinkConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS)) > .createClusterSpecification(); > YarnClusterDescriptor yarnClusterDescriptor = new > YarnClusterDescriptor( > flinkConfiguration, > (YarnConfiguration) yarnClient.getConfig(), > yarnClient, > clusterInformationRetriever, >true); > >try { > ClusterClientProvider<ApplicationId> clusterClientProvider = > yarnClusterDescriptor.deployApplicationCluster( > clusterSpecification, > appConfig); > > ClusterClient<ApplicationId> clusterClient = > clusterClientProvider.getClusterClient(); > > ApplicationId applicationId = clusterClient.getClusterId(); > String webInterfaceURL = clusterClient.getWebInterfaceURL(); >log.error("applicationId is {}", applicationId); >log.error("webInterfaceURL is {}", webInterfaceURL); > >// 退出 >// yarnClusterDescriptor.killCluster(applicationId); >} catch (Exception e) { >log.error(e.getMessage(), e); > } finally { >// yarnClient.close(); >} > >以下是提交的部分日志: > 09:24:01.288 [IPC Parameter Sending Thread #0] DEBUG > org.apache.hadoop.ipc.Client - IPC Client (1948810915) connection to > bigdata-beta2/192.168.15.185:8032 from hdfs sending #31 > org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.getApplicationReport >09:24:01.305 [IPC Client (1948810915) connection to >bigdata-beta2/192.168.15.185:8032 from hdfs] DEBUG >org.apache.hadoop.ipc.Client - IPC Client (1948810915) connection to >bigdata-beta2/192.168.15.185:8032 from hdfs got value #31 >09:24:01.305 [main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: >getApplicationReport took 18ms >09:24:01.305 [main] DEBUG org.apache.flink.yarn.YarnClusterDescriptor - >Application State: FAILED >09:24:01.321 [main] ERROR com.yaduo.flink.DeployTest - Couldn't deploy Yarn >Application Cluster >org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy >Yarn Application Cluster >at >org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:466) >at com.yaduo.flink.DeployTest.main(DeployTest.java:122) >Caused by: >org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN >application unexpectedly switched to state FAILED during deployment. >Diagnostics from YARN: Application application_1647875542261_0079 failed 2 >times due to AM Container for appattempt_1647875542261_0079_000002 exited with > exitCode: 1 >Failing this attempt.Diagnostics: [2022-04-08 09:24:00.480]Exception from >container-launch. >Container id: container_1647875542261_0079_02_000002 >Exit code: 1 > > >[2022-04-08 09:24:00.481]Container exited with a non-zero exit code 1. Error >file: prelaunch.err. >Last 4096 bytes of prelaunch.err : > > >[2022-04-08 09:24:00.481]Container exited with a non-zero exit code 1. Error >file: prelaunch.err. >Last 4096 bytes of prelaunch.err : > > >For more detailed output, check the application tracking page: >http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079 Then >click on links to logs of each attempt. >. Failing the application. >If log aggregation is enabled on your cluster, use this command to further >investigate the issue: >yarn logs -applicationId application_1647875542261_0079 >at >org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1219) >at >org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:607) >at >org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:459) >... 1 common frames omitted >09:24:01.321 [Thread-9] INFO org.apache.flink.yarn.YarnClusterDescriptor - >Cancelling deployment from Deployment Failure Hook >09:24:01.321 [Thread-9] INFO org.apache.hadoop.yarn.client.RMProxy - >Connecting to ResourceManager at bigdata-beta2/192.168.15.185:8032 >09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.security.UserGroupInformation >- PrivilegedAction as:hdfs (auth:SIMPLE) >from:org.apache.hadoop.yarn.client.RMProxy.getProxy(RMProxy.java:147) >09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.yarn.ipc.YarnRPC - Creating >YarnRPC for null >09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC - >Creating a HadoopYarnProtoRpc proxy for protocol interface >org.apache.hadoop.yarn.api.ApplicationClientProtocol >09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.ipc.Client - getting client >out of cache: org.apache.hadoop.ipc.Client@6692b6c6 >09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.service.AbstractService - >Service org.apache.hadoop.yarn.client.api.impl.YarnClientImpl is started >09:24:01.321 [Thread-9] INFO org.apache.flink.yarn.YarnClusterDescriptor - >Killing YARN application >09:24:01.321 [Thread-4] DEBUG org.apache.hadoop.util.ShutdownHookManager - >ShutdownHookManger complete shutdown. > > > > > 以上是相关问题描述,已排查了多天,未找到原因,特此咨询专业团队,辛苦能给一些指导,感谢。 > 期待回复! > > >以上,祝好 > > > > >