因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试
________________________________ [email protected] 发件人: 郑 洁锋<mailto:[email protected]> 发送时间: 2020-01-16 14:24 收件人: user-zh<mailto:[email protected]> 主题: Re: Re: MiniCluster问题 这是完整的到启动的代码 public class ClusterClientFactory { public static ClusterClient createClusterClient(Options launcherOptions) throws Exception { String mode = launcherOptions.getMode(); if(mode.equals(ClusterMode.standalone.name())) { return createStandaloneClient(launcherOptions); } else if(mode.equals(ClusterMode.yarn.name())) { return createYarnClient(launcherOptions,mode); } throw new IllegalArgumentException("Unsupported cluster client type: "); } public static ClusterClient createStandaloneClient(Options launcherOptions) throws Exception { String flinkConfDir = launcherOptions.getFlinkconf(); Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir); MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder(); configBuilder.setConfiguration(config); MiniCluster miniCluster = new MiniCluster(configBuilder.build()); MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster); LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo(); InetSocketAddress address = AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress()); config.setString(JobManagerOptions.ADDRESS, address.getAddress().getHostName()); config.setInteger(JobManagerOptions.PORT, address.getPort()); clusterClient.setDetached(true); return clusterClient; } 启动类中: ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions); clusterClient.run(program, 1); clusterClient.shutdown(); ________________________________ [email protected] 发件人: tison<mailto:[email protected]> 发送时间: 2020-01-16 13:31 收件人: user-zh<mailto:[email protected]> 主题: Re: Re: MiniCluster问题 MiniCluster miniCluster = new MiniCluster(configBuilder.build()); miniCluster.start(); MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster) ; Best, tison. tison <[email protected]> 于2020年1月16日周四 下午1:30写道: > 跟集群无关 > Best, > tison. > > > tison <[email protected]> 于2020年1月16日周四 下午1:30写道: > >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖 >> >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊 >> >> Best, >> tison. >> >> >> 郑 洁锋 <[email protected]> 于2020年1月16日周四 下午1:18写道: >> >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容? >>> 我是通过bin/start-cluster.sh启动的flink standalone集群 >>> >>> >>> ________________________________ >>> [email protected] >>> >>> 发件人: tison<mailto:[email protected]> >>> 发送时间: 2020-01-16 12:39 >>> 收件人: user-zh<mailto:[email protected]> >>> 主题: Re: MiniCluster问题 >>> 你 MiniCluster 要 start 啊(x >>> >>> Best, >>> tison. >>> >>> >>> 郑 洁锋 <[email protected]> 于2020年1月16日周四 上午11:38写道: >>> >>> > MiniCluster代码执行过程中报错: >>> > >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for >>> further details. >>> > Exception in thread "main" java.lang.IllegalStateException: >>> MiniCluster is not yet running. >>> > at >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >>> > at >>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) >>> > at >>> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) >>> > at >>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) >>> > at >>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) >>> > at >>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) >>> > >>> > 报错段代码如下: >>> > >>> > Configuration config = >>> GlobalConfiguration.loadConfiguration(flinkConfDir); >>> > MiniClusterConfiguration.Builder configBuilder = new >>> MiniClusterConfiguration.Builder(); >>> > configBuilder.setConfiguration(config); >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); >>> > MiniClusterClient clusterClient = new MiniClusterClient(config, >>> miniCluster); >>> > >>> > 其中flinkConfDir为/opt/flink/conf >>> > >>> > >>> > flink standalone HA集群信息如下: >>> > ------------------------------ >>> > [email protected] >>> > >>> > >>> > >>> >>
