Hi
可以参考org.apache.flink.streaming.api.environment.LocalStreamEnvironment::
execute
public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);
if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.setString(RestOptions.BIND_PORT, "0");
}
int numSlotsPerTaskManager =
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS,
jobGraph.getMaximumParallelism());
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
if (LOG.isInfoEnabled()) {
LOG.info("Running job on local embedded Flink mini cluster");
}
MiniCluster miniCluster = new MiniCluster(cfg);
try {
miniCluster.start();
configuration.setInteger(RestOptions.PORT,
miniCluster.getRestAddress().get().getPort());
return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
miniCluster.close();
}
}
}
Best,
Eleanore
On Wed, Jan 15, 2020 at 8:40 PM tison <[email protected]> wrote:
> 你 MiniCluster 要 start 啊(x
>
> Best,
> tison.
>
>
> 郑 洁锋 <[email protected]> 于2020年1月16日周四 上午11:38写道:
>
> > MiniCluster代码执行过程中报错:
> >
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> > Exception in thread "main" java.lang.IllegalStateException: MiniCluster
> is not yet running.
> > at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> > at
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> > at
> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
> > at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> > at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> > at
> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >
> > 报错段代码如下:
> >
> > Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
> > MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
> > configBuilder.setConfiguration(config);
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
> >
> > 其中flinkConfDir为/opt/flink/conf
> >
> >
> > flink standalone HA集群信息如下:
> > ------------------------------
> > [email protected]
> >
> >
> >
>