这个信息比较少,第一感觉是因为YARN任务提交相关信息没有设置正确,如果感兴趣可以看看https://github.com/hortonworks/simple-yarn-app
这个项目理解清楚YARN APP的机制和原理;
回到你本质诉求上来,你是想开发一个任务托管平台,一个简单,正常的思路是,你通过封装拼接参数,然后通过调用 $FLINK_HOME/bin/flink run
相关命令来提交任务
你现在的思路有点跑偏,也可能是因为你的场景下有其他我不知道的需求点;
另外调度平台,Apache DolphinScheduler 也是一个不错的选择,也是国内开源的优秀项目,功能完善,也可以参考
https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/flink-call.html
希望对你有所帮助
--
Best regards,
Mang Zhang
在 2022-04-08 20:56:33,"周涛" <06160...@163.com> 写道:
>非常感谢Mang Zhang的回复
>AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类:
>
>Log Type: jobmanager.err
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 107
>
>Error: Could not find or load main class
>org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>
>
>Log Type: jobmanager.out
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 0
>
>Log Type: prelaunch.err
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 0
>
>Log Type: prelaunch.out
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 70
>
>Setting up env variables
>Setting up job resources
>Launching container
>
>
>我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar
>提交到yarn后,启动jm时抛错。一直未找到原因
>
>
>
>
>另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-04-08 17:57:48,"Mang Zhang" <zhangma...@163.com> 写道:
>
>
>
>
>这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log
>比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
>另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN
>比如flink lib下的jar,使用 YarnLocalResourceDescriptor
>注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制;
>
>
>建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了
>
>
>
>
>
>
>
>祝好
>
>
>
>
>
>
>
>--
>
>Best regards,
>Mang Zhang
>
>
>
>
>
>在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道:
>>
>>
>>
>>
>>
>>
>>hi,
>> 我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
>> flink版本1.14.4
>> Hadoop版本:3.0.0-cdh6.2.1
>> application模式,使用命令提交正常运行,api提交失败
>> 提交失败,yarn日志:
>> LogType:jobmanager.err
>> LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
>> LogLength:107
>> LogContents:
>> Error: Could not find or load main class
>> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>> End of LogType:jobmanager.err
>>以下是代码:
>>
>>
>> System.setProperty("HADOOP_USER_NAME", "hdfs");
>>//flink的本地配置目录,为了得到flink的配置
>>String configurationDirectory =
>>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
>>
>>//存放flink集群相关的jar包目录
>>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>>//用户jar
>>String userJarPath =
>>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
>> String flinkDistJar =
>> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>>
>> YarnClientService yarnClientService = new YarnClientService();
>> //yarnclient创建
>> YarnClient yarnClient = yarnClientService.getYarnClient();
>> yarnClient.start();
>>
>>// 设置日志的,没有的话看不到日志
>>YarnClusterInformationRetriever clusterInformationRetriever =
>>YarnClientYarnClusterInformationRetriever
>> .create(yarnClient);
>>
>>//获取flink的配置
>>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
>> configurationDirectory);
>>
>>
>> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
>>
>> flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS,
>> true);
>>
>> flinkConfiguration.set(PipelineOptions.JARS,
>> Collections.singletonList(userJarPath));
>>
>> Path remoteLib = new Path(flinkLibs);
>> flinkConfiguration.set(
>> YarnConfigOptions.PROVIDED_LIB_DIRS,
>> Collections.singletonList(remoteLib.toString()));
>>
>> flinkConfiguration.set(
>> YarnConfigOptions.FLINK_DIST_JAR,
>> flinkDistJar);
>>
>>// 设置为application模式
>>flinkConfiguration.set(
>> DeploymentOptions.TARGET,
>> YarnDeploymentTarget.APPLICATION.getName());
>>
>>// yarn application name
>>flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME,
>>"flink-application");
>>
>> YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration,
>> configurationDirectory);
>>
>>// 设置用户jar的参数和主类
>>ApplicationConfiguration appConfig = new ApplicationConfiguration(new
>>String[]{}, "com.zt.FlinkTest1");
>>
>>
>>final int jobManagerMemoryMB =
>>
>> JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
>> flinkConfiguration,
>> JobManagerOptions.TOTAL_PROCESS_MEMORY)
>> .getTotalProcessMemorySize()
>> .getMebiBytes();
>>final int taskManagerMemoryMB =
>> TaskExecutorProcessUtils.processSpecFromConfig(
>> TaskExecutorProcessUtils
>>
>> .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
>> flinkConfiguration,
>>
>> TaskManagerOptions.TOTAL_PROCESS_MEMORY))
>> .getTotalProcessMemorySize()
>> .getMebiBytes();
>> ClusterSpecification clusterSpecification = new
>> ClusterSpecification.ClusterSpecificationBuilder()
>> .setMasterMemoryMB(jobManagerMemoryMB)
>> .setTaskManagerMemoryMB(taskManagerMemoryMB)
>>
>> .setSlotsPerTaskManager(flinkConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
>> .createClusterSpecification();
>> YarnClusterDescriptor yarnClusterDescriptor = new
>> YarnClusterDescriptor(
>> flinkConfiguration,
>> (YarnConfiguration) yarnClient.getConfig(),
>> yarnClient,
>> clusterInformationRetriever,
>>true);
>>
>>try {
>> ClusterClientProvider<ApplicationId> clusterClientProvider =
>> yarnClusterDescriptor.deployApplicationCluster(
>> clusterSpecification,
>> appConfig);
>>
>> ClusterClient<ApplicationId> clusterClient =
>> clusterClientProvider.getClusterClient();
>>
>> ApplicationId applicationId = clusterClient.getClusterId();
>> String webInterfaceURL = clusterClient.getWebInterfaceURL();
>>log.error("applicationId is {}", applicationId);
>>log.error("webInterfaceURL is {}", webInterfaceURL);
>>
>>// 退出
>>// yarnClusterDescriptor.killCluster(applicationId);
>>} catch (Exception e) {
>>log.error(e.getMessage(), e);
>> } finally {
>>// yarnClient.close();
>>}
>>
>>以下是提交的部分日志:
>> 09:24:01.288 [IPC Parameter Sending Thread #0] DEBUG
>> org.apache.hadoop.ipc.Client - IPC Client (1948810915) connection to
>> bigdata-beta2/192.168.15.185:8032 from hdfs sending #31
>> org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.getApplicationReport
>>09:24:01.305 [IPC Client (1948810915) connection to
>>bigdata-beta2/192.168.15.185:8032 from hdfs] DEBUG
>>org.apache.hadoop.ipc.Client - IPC Client (1948810915) connection to
>>bigdata-beta2/192.168.15.185:8032 from hdfs got value #31
>>09:24:01.305 [main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call:
>>getApplicationReport took 18ms
>>09:24:01.305 [main] DEBUG org.apache.flink.yarn.YarnClusterDescriptor -
>>Application State: FAILED
>>09:24:01.321 [main] ERROR com.yaduo.flink.DeployTest - Couldn't deploy Yarn
>>Application Cluster
>>org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
>>deploy Yarn Application Cluster
>>at
>>org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:466)
>>at com.yaduo.flink.DeployTest.main(DeployTest.java:122)
>>Caused by:
>>org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN
>>application unexpectedly switched to state FAILED during deployment.
>>Diagnostics from YARN: Application application_1647875542261_0079 failed 2
>>times due to AM Container for appattempt_1647875542261_0079_000002 exited
>>with exitCode: 1
>>Failing this attempt.Diagnostics: [2022-04-08 09:24:00.480]Exception from
>>container-launch.
>>Container id: container_1647875542261_0079_02_000002
>>Exit code: 1
>>
>>
>>[2022-04-08 09:24:00.481]Container exited with a non-zero exit code 1. Error
>>file: prelaunch.err.
>>Last 4096 bytes of prelaunch.err :
>>
>>
>>[2022-04-08 09:24:00.481]Container exited with a non-zero exit code 1. Error
>>file: prelaunch.err.
>>Last 4096 bytes of prelaunch.err :
>>
>>
>>For more detailed output, check the application tracking page:
>>http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079 Then
>>click on links to logs of each attempt.
>>. Failing the application.
>>If log aggregation is enabled on your cluster, use this command to further
>>investigate the issue:
>>yarn logs -applicationId application_1647875542261_0079
>>at
>>org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1219)
>>at
>>org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:607)
>>at
>>org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:459)
>>... 1 common frames omitted
>>09:24:01.321 [Thread-9] INFO org.apache.flink.yarn.YarnClusterDescriptor -
>>Cancelling deployment from Deployment Failure Hook
>>09:24:01.321 [Thread-9] INFO org.apache.hadoop.yarn.client.RMProxy -
>>Connecting to ResourceManager at bigdata-beta2/192.168.15.185:8032
>>09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.security.UserGroupInformation
>>- PrivilegedAction as:hdfs (auth:SIMPLE)
>>from:org.apache.hadoop.yarn.client.RMProxy.getProxy(RMProxy.java:147)
>>09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.yarn.ipc.YarnRPC - Creating
>>YarnRPC for null
>>09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC -
>>Creating a HadoopYarnProtoRpc proxy for protocol interface
>>org.apache.hadoop.yarn.api.ApplicationClientProtocol
>>09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.ipc.Client - getting client
>>out of cache: org.apache.hadoop.ipc.Client@6692b6c6
>>09:24:01.321 [Thread-9] DEBUG org.apache.hadoop.service.AbstractService -
>>Service org.apache.hadoop.yarn.client.api.impl.YarnClientImpl is started
>>09:24:01.321 [Thread-9] INFO org.apache.flink.yarn.YarnClusterDescriptor -
>>Killing YARN application
>>09:24:01.321 [Thread-4] DEBUG org.apache.hadoop.util.ShutdownHookManager -
>>ShutdownHookManger complete shutdown.
>>
>>
>>
>>
>> 以上是相关问题描述,已排查了多天,未找到原因,特此咨询专业团队,辛苦能给一些指导,感谢。
>> 期待回复!
>>
>>
>>以上,祝好
>>
>>
>>
>>
>>
>
>
>
>
>
>