Hi

可以参考org.apache.flink.streaming.api.environment.LocalStreamEnvironment::
execute

public JobExecutionResult execute(String jobName) throws Exception {
  // transform the streaming program into a JobGraph
  StreamGraph streamGraph = getStreamGraph();
  streamGraph.setJobName(jobName);

  JobGraph jobGraph = streamGraph.getJobGraph();
  jobGraph.setAllowQueuedScheduling(true);

  Configuration configuration = new Configuration();
  configuration.addAll(jobGraph.getJobConfiguration());
  configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

  // add (and override) the settings with what the user defined
  configuration.addAll(this.configuration);

  if (!configuration.contains(RestOptions.BIND_PORT)) {
   configuration.setString(RestOptions.BIND_PORT, "0");
  }

  int numSlotsPerTaskManager =
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS,
jobGraph.getMaximumParallelism());

  MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
   .setConfiguration(configuration)
   .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
   .build();

  if (LOG.isInfoEnabled()) {
   LOG.info("Running job on local embedded Flink mini cluster");
  }

  MiniCluster miniCluster = new MiniCluster(cfg);

  try {
   miniCluster.start();
   configuration.setInteger(RestOptions.PORT,
miniCluster.getRestAddress().get().getPort());

   return miniCluster.executeJobBlocking(jobGraph);
  }
  finally {
   transformations.clear();
   miniCluster.close();
  }
 }
}

Best,
Eleanore

On Wed, Jan 15, 2020 at 8:40 PM tison <[email protected]> wrote:

> 你 MiniCluster 要 start 啊(x
>
> Best,
> tison.
>
>
> 郑 洁锋 <[email protected]> 于2020年1月16日周四 上午11:38写道:
>
> > MiniCluster代码执行过程中报错:
> >
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> > Exception in thread "main" java.lang.IllegalStateException: MiniCluster
> is not yet running.
> >         at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> >         at
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> >         at
> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
> >         at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> >         at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> >         at
> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >
> > 报错段代码如下:
> >
> > Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
> > MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
> > configBuilder.setConfiguration(config);
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
> >
> > 其中flinkConfDir为/opt/flink/conf
> >
> >
> > flink standalone HA集群信息如下:
> > ------------------------------
> > [email protected]
> >
> >
> >
>

回复