我是不是可以理解为是local的cluster,本身会去启动一个flink cluster(当前服务器模拟分布式环境),无需单独部署一个flink集群

________________________________
zjfpla...@hotmail.com

发件人: tison<mailto:wander4...@gmail.com>
发送时间: 2020-01-16 14:29
收件人: user-zh<mailto:user-zh@flink.apache.org>
主题: Re: Re: MiniCluster问题
你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone
是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没
start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。

Best,
tison.


郑 洁锋 <zjfpla...@hotmail.com> 于2020年1月16日周四 下午2:27写道:

> 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试
>
> ________________________________
> zjfpla...@hotmail.com
>
> 发件人: 郑 洁锋<mailto:zjfpla...@hotmail.com>
> 发送时间: 2020-01-16 14:24
> 收件人: user-zh<mailto:user-zh@flink.apache.org>
> 主题: Re: Re: MiniCluster问题
> 这是完整的到启动的代码
>
> public class ClusterClientFactory {
>
>     public static ClusterClient createClusterClient(Options
> launcherOptions) throws Exception {
>         String mode = launcherOptions.getMode();
>         if(mode.equals(ClusterMode.standalone.name())) {
>             return createStandaloneClient(launcherOptions);
>         } else if(mode.equals(ClusterMode.yarn.name())) {
>             return createYarnClient(launcherOptions,mode);
>         }
>         throw new IllegalArgumentException("Unsupported cluster client
> type: ");
>     }
>
>     public static ClusterClient createStandaloneClient(Options
> launcherOptions) throws Exception {
>         String flinkConfDir = launcherOptions.getFlinkconf();
>         Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
>         MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
>         configBuilder.setConfiguration(config);
>         MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>         MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
>         LeaderConnectionInfo connectionInfo =
> clusterClient.getClusterConnectionInfo();
>         InetSocketAddress address =
> AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
>         config.setString(JobManagerOptions.ADDRESS,
> address.getAddress().getHostName());
>         config.setInteger(JobManagerOptions.PORT, address.getPort());
>         clusterClient.setDetached(true);
>         return clusterClient;
>     }
>
>
> 启动类中:
>
> ClusterClient clusterClient =
> ClusterClientFactory.createClusterClient(launcherOptions);
> clusterClient.run(program, 1);
> clusterClient.shutdown();
>
> ________________________________
> zjfpla...@hotmail.com
>
> 发件人: tison<mailto:wander4...@gmail.com>
> 发送时间: 2020-01-16 13:31
> 收件人: user-zh<mailto:user-zh@flink.apache.org>
> 主题: Re: Re: MiniCluster问题
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>
> miniCluster.start();
>
>
> MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster)
> ;
>
> Best,
> tison.
>
>
> tison <wander4...@gmail.com> 于2020年1月16日周四 下午1:30写道:
>
> > 跟集群无关
> > Best,
> > tison.
> >
> >
> > tison <wander4...@gmail.com> 于2020年1月16日周四 下午1:30写道:
> >
> >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
> >>
> >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> 郑 洁锋 <zjfpla...@hotmail.com> 于2020年1月16日周四 下午1:18写道:
> >>
> >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> >>> 我是通过bin/start-cluster.sh启动的flink standalone集群
> >>>
> >>>
> >>> ________________________________
> >>> zjfpla...@hotmail.com
> >>>
> >>> 发件人: tison<mailto:wander4...@gmail.com>
> >>> 发送时间: 2020-01-16 12:39
> >>> 收件人: user-zh<mailto:user-zh@flink.apache.org>
> >>> 主题: Re: MiniCluster问题
> >>> 你 MiniCluster 要 start 啊(x
> >>>
> >>> Best,
> >>> tison.
> >>>
> >>>
> >>> 郑 洁锋 <zjfpla...@hotmail.com> 于2020年1月16日周四 上午11:38写道:
> >>>
> >>> > MiniCluster代码执行过程中报错:
> >>> >
> >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> >>> further details.
> >>> > Exception in thread "main" java.lang.IllegalStateException:
> >>> MiniCluster is not yet running.
> >>> >         at
> >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> >>> >         at
> >>>
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> >>> >         at
> >>>
> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
> >>> >         at
> >>>
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> >>> >         at
> >>>
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> >>> >         at
> >>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >>> >
> >>> > 报错段代码如下:
> >>> >
> >>> > Configuration config =
> >>> GlobalConfiguration.loadConfiguration(flinkConfDir);
> >>> > MiniClusterConfiguration.Builder configBuilder = new
> >>> MiniClusterConfiguration.Builder();
> >>> > configBuilder.setConfiguration(config);
> >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> >>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> >>> miniCluster);
> >>> >
> >>> > 其中flinkConfDir为/opt/flink/conf
> >>> >
> >>> >
> >>> > flink standalone HA集群信息如下:
> >>> > ------------------------------
> >>> > zjfpla...@hotmail.com
> >>> >
> >>> >
> >>> >
> >>>
> >>
>

回复