这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log
比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN
比如flink lib下的jar,使用 YarnLocalResourceDescriptor
注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制;
建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了
--
Best regards,
Mang Zhang
在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道:
>
>
>
>
>
>
>hi,
> 我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
> flink版本1.14.4
> Hadoop版本:3.0.0-cdh6.2.1
> application模式,使用命令提交正常运行,api提交失败
> 提交失败,yarn日志:
> LogType:jobmanager.err
> LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
> LogLength:107
> LogContents:
> Error: Could not find or load main class
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
> End of LogType:jobmanager.err
>以下是代码:
>
>
> System.setProperty("HADOOP_USER_NAME", "hdfs");
>//flink的本地配置目录,为了得到flink的配置
>String configurationDirectory =
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
>
>//存放flink集群相关的jar包目录
>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>//用户jar
>String userJarPath =
>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
> String flinkDistJar =
> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>
> YarnClientService yarnClientService = new YarnClientService();
> //yarnclient创建
> YarnClient yarnClient = yarnClientService.getYarnClient();
> yarnClient.start();
>
>// 设置日志的,没有的话看不到日志
>YarnClusterInformationRetriever clusterInformationRetriever =
>YarnClientYarnClusterInformationRetriever
> .create(yarnClient);
>
>//获取flink的配置
>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
> configurationDirectory);
>
>
> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
>
> flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS,
> true);
>
> flinkConfiguration.set(PipelineOptions.JARS,
> Collections.singletonList(userJarPath));
>
> Path remoteLib = new Path(flinkLibs);
> flinkConfiguration.set(
> YarnConfigOptions.PROVIDED_LIB_DIRS,
> Collections.singletonList(remoteLib.toString()));
>
> flinkConfiguration.set(
> YarnConfigOptions.FLINK_DIST_JAR,
> flinkDistJar);
>
>// 设置为application模式
>flinkConfiguration.set(
> DeploymentOptions.TARGET,
> YarnDeploymentTarget.APPLICATION.getName());
>
>// yarn application name
>flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME,
>"flink-application");
>
> YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration,
> configurationDirectory);
>
>// 设置用户jar的参数和主类
>ApplicationConfiguration appConfig = new ApplicationConfiguration(new
>String[]{}, "com.zt.FlinkTest1");
>
>
>final int jobManagerMemoryMB =
>
> JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
> flinkConfiguration,
> JobManagerOptions.TOTAL_PROCESS_MEMORY)
> .getTotalProcessMemorySize()
> .getMebiBytes();
>final int taskManagerMemoryMB =
> TaskExecutorProcessUtils.processSpecFromConfig(
> TaskExecutorProcessUtils
>
> .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
> flinkConfiguration,
>
> TaskManagerOptions.TOTAL_PROCESS_MEMORY))
> .getTotalProcessMemorySize()
> .getMebiBytes();
> ClusterSpecification clusterSpecification = new
> ClusterSpecification.ClusterSpecificationBuilder()
> .setMasterMemoryMB(jobManagerMemoryMB)
> .setTaskManagerMemoryMB(taskManagerMemoryMB)
>
> .setSlotsPerTaskManager(flinkConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
> .createClusterSpecification();
> YarnClusterDescriptor yarnClusterDescriptor = new
> YarnClusterDescriptor(
> flinkConfiguration,
> (YarnConfiguration) yarnClient.getConfig(),
> yarnClient,
> clusterInformationRetriever,
>true);
>
>try {
> ClusterClientProvider<ApplicationId> clusterClientProvider =
> yarnClusterDescriptor.deployApplicationCluster(
> clusterSpecification,
> appConfig);
>
> ClusterClient<ApplicationId> clusterClient =
> clusterClientProvider.getClusterClient();
>
> ApplicationId applicationId = clusterClient.getClusterId();
> String webInterfaceURL = clusterClient.getWebInterfaceURL();
>log.error("applicationId is {}", applicationId);
>log.error("webInterfaceURL is {}", webInterfaceURL);
>
>// 退出
>// yarnClusterDescriptor.killCluster(applicationId);
>} catch (Exception e) {
>log.error(e.getMessage(), e);
> } finally {
>// yarnClient.close();
>}
>
>以下是提交的部分日志:
> 09:24:01.288 [IPC Parameter Sending Thread #0] DEBUG
> org.apache.hadoop.ipc.Client - IPC Client (1948810915) connection to
> bigdata-beta2/192.168.15.185:8032 from hdfs sending #31
> org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.getApplicationReport
>09:24:01.305 [IPC Client (1948810915) connection to
>bigdata-beta2/192.168.15.185:8032 from hdfs] DEBUG
>org.apache.hadoop.ipc.Client - IPC Client (1948810915) connection to
>bigdata-beta2/192.168.15.185:8032 from hdfs got value #31
>09:24:01.305 [main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call:
>getApplicationReport took 18ms
>09:24:01.305 [main] DEBUG org.apache.flink.yarn.YarnClusterDescriptor -
>Application State: FAILED
>09:24:01.321 [main] ERROR com.yaduo.flink.DeployTest - Couldn't deploy Yarn
>Application Cluster
>org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy
>Yarn Application Cluster
>at
>org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:466)
>at com.yaduo.flink.DeployTest.main(DeployTest.java:122)
>Caused by:
>org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN
>application unexpectedly switched to state FAILED during deployment.
>Diagnostics from YARN: Application application_1647875542261_0079 failed 2
>times due to AM Container for appattempt_1647875542261_0079_000002 exited with
> exitCode: 1
>Failing this attempt.Diagnostics: [2022-04-08 09:24:00.480]Exception from
>container-launch.
>Container id: container_1647875542261_0079_02_000002
>Exit code: 1
>
>
>[2022-04-08 09:24:00.481]Container exited with a non-zero exit code 1. Error
>file: prelaunch.err.
>Last 4096 bytes of prelaunch.err :
>
>
>[2022-04-08 09:24:00.481]Container exited with a non-zero exit code 1. Error
>file: prelaunch.err.
>Last 4096 bytes of prelaunch.err :
>
>
>For more detailed output, check the application tracking page:
>http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079 Then
>click on links to logs of each attempt.
>. Failing the application.
>If log aggregation is enabled on your cluster, use this command to further
>investigate the issue:
>yarn logs -applicationId application_1647875542261_0079
>at
>org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1219)
>at
>org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:607)
>at
>org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:459)
>... 1 common frames omitted
>09:24:01.321 [Thread-9] INFO org.apache.flink.yarn.YarnClusterDescriptor -
>Cancelling deployment from Deployment Failure Hook
>09:24:01.321 [Thread-9] INFO org.apache.hadoop.yarn.client.RMProxy -
>Connecting to ResourceManager at bigdata-beta2/192.168.15.185:8032
>09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.security.UserGroupInformation
>- PrivilegedAction as:hdfs (auth:SIMPLE)
>from:org.apache.hadoop.yarn.client.RMProxy.getProxy(RMProxy.java:147)
>09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.yarn.ipc.YarnRPC - Creating
>YarnRPC for null
>09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC -
>Creating a HadoopYarnProtoRpc proxy for protocol interface
>org.apache.hadoop.yarn.api.ApplicationClientProtocol
>09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.ipc.Client - getting client
>out of cache: org.apache.hadoop.ipc.Client@6692b6c6
>09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.service.AbstractService -
>Service org.apache.hadoop.yarn.client.api.impl.YarnClientImpl is started
>09:24:01.321 [Thread-9] INFO org.apache.flink.yarn.YarnClusterDescriptor -
>Killing YARN application
>09:24:01.321 [Thread-4] DEBUG org.apache.hadoop.util.ShutdownHookManager -
>ShutdownHookManger complete shutdown.
>
>
>
>
> 以上是相关问题描述,已排查了多天,未找到原因,特此咨询专业团队,辛苦能给一些指导,感谢。
> 期待回复!
>
>
>以上,祝好
>
>
>
>
>