This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executor-impl in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8212ba8d3721f7fc985036473b76bee32eddbdc7 Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Mon Nov 18 10:25:19 2019 +0100 [FLINK-XXXXX] Deduplicate the Executor code and fix javadocs. --- .../deployment/AbstractJobClusterExecutor.java | 71 ++++++++++++++++++++++ .../deployment/AbstractSessionClusterExecutor.java | 66 ++++++++++++++++++++ .../flink/client/deployment/ExecutorUtils.java | 59 ++++++++++++++++++ .../StandaloneSessionClusterExecutor.java | 63 +------------------ .../org/apache/flink/core/execution/Executor.java | 13 ++-- .../yarn/executors/YarnJobClusterExecutor.java | 68 +-------------------- .../yarn/executors/YarnSessionClusterExecutor.java | 53 +--------------- 7 files changed, 214 insertions(+), 179 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java new file mode 100644 index 0000000..310dd61 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.cli.ExecutionConfigAccessor; +import org.apache.flink.client.deployment.executors.JobClientImpl; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on dedicated (per-job) clusters. + * + * @param <ClusterID> the type of the id of the cluster. + * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster. + */ +@Internal +public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class); + + private final ClientFactory clusterClientFactory; + + public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) { + this.clusterClientFactory = checkNotNull(clusterClientFactory); + } + + @Override + public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception { + final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration); + + try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { + final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); + + final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); + + final ClusterClient<ClusterID> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode()); + LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); + return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID())); + } + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java new file mode 100644 index 0000000..2150b18 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on an existing (session) cluster. + * + * @param <ClusterID> the type of the id of the cluster. + * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster. + */ +@Internal +public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor { + + private final ClientFactory clusterClientFactory; + + public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) { + this.clusterClientFactory = checkNotNull(clusterClientFactory); + } + + @Override + public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception { + final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration); + + try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { + final ClusterID clusterID = clusterClientFactory.getClusterId(configuration); + checkState(clusterID != null); + + // the caller should take care of managing the life-cycle of the return JobClient. + + final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID); + return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph); + } + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java new file mode 100644 index 0000000..af540cb --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment; + +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.FlinkPipelineTranslationUtil; +import org.apache.flink.client.cli.ExecutionConfigAccessor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import javax.annotation.Nonnull; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Utility class with method related to job execution. + */ +public class ExecutorUtils { + + /** + * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}. + * + * @param pipeline the pipeline whose job graph we are computing + * @param configuration the configuration with the necessary information such as jars and + * classpaths to be included, the parallelism of the job and potential + * savepoint settings used to boostrap its state. + * @return the corresponding {@link JobGraph}. + */ + public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) { + checkNotNull(pipeline); + checkNotNull(configuration); + + final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); + final JobGraph jobGraph = FlinkPipelineTranslationUtil + .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism()); + + jobGraph.addJars(executionConfigAccessor.getJars()); + jobGraph.setClasspaths(executionConfigAccessor.getClasspaths()); + jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings()); + + return jobGraph; + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java index df4d2ba..f097323 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java @@ -19,77 +19,20 @@ package org.apache.flink.client.deployment.executors; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.client.ClientUtils; -import org.apache.flink.client.FlinkPipelineTranslationUtil; -import org.apache.flink.client.cli.ExecutionConfigAccessor; +import org.apache.flink.client.deployment.AbstractSessionClusterExecutor; import org.apache.flink.client.deployment.StandaloneClientFactory; -import org.apache.flink.client.deployment.StandaloneClusterDescriptor; import org.apache.flink.client.deployment.StandaloneClusterId; -import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.Executor; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.runtime.jobgraph.JobGraph; - -import java.net.URL; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * The {@link Executor} to be used when executing a job on an already running cluster. */ @Internal -public class StandaloneSessionClusterExecutor implements Executor { +public class StandaloneSessionClusterExecutor extends AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> { public static final String NAME = "standalone-session-cluster"; - private final StandaloneClientFactory clusterClientFactory; - public StandaloneSessionClusterExecutor() { - this.clusterClientFactory = new StandaloneClientFactory(); - } - - @Override - public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception { - final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); - - final List<URL> dependencies = configAccessor.getJars(); - final List<URL> classpaths = configAccessor.getClasspaths(); - - final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies); - - try (final StandaloneClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { - final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration); - checkState(clusterID != null); - - final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID); - return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph); - } - } - - private JobGraph getJobGraph( - final Pipeline pipeline, - final Configuration configuration, - final List<URL> classpaths, - final List<URL> libraries) { - - checkNotNull(pipeline); - checkNotNull(configuration); - checkNotNull(classpaths); - checkNotNull(libraries); - - final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); - final JobGraph jobGraph = FlinkPipelineTranslationUtil - .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism()); - - jobGraph.addJars(libraries); - jobGraph.setClasspaths(classpaths); - jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings()); - - return jobGraph; + super(new StandaloneClientFactory()); } } diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java index 5be3193..b585be6 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java @@ -18,10 +18,11 @@ package org.apache.flink.core.execution; -import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.configuration.Configuration; +import javax.annotation.Nonnull; + import java.util.concurrent.CompletableFuture; /** @@ -30,11 +31,15 @@ import java.util.concurrent.CompletableFuture; public interface Executor { /** - * Executes a {@link Pipeline} based on the provided configuration. + * Executes a {@link Pipeline} based on the provided configuration and returns a {@link JobClient} which allows to + * interact with the job being executed, e.g. cancel it or take a savepoint. + * + * <p><b>ATTENTION:</b> The caller is responsible for managing the lifecycle of the returned {@link JobClient}. This + * means that e.g. {@code close()} should be called explicitly at the call-site. * * @param pipeline the {@link Pipeline} to execute * @param configuration the {@link Configuration} with the required execution parameters - * @return the {@link JobExecutionResult} corresponding to the pipeline execution. + * @return a {@link CompletableFuture} with the {@link JobClient} corresponding to the pipeline. */ - CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception; + CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception; } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java index ead6e9a..084b020 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java @@ -19,28 +19,11 @@ package org.apache.flink.yarn.executors; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.client.FlinkPipelineTranslationUtil; -import org.apache.flink.client.cli.ExecutionConfigAccessor; -import org.apache.flink.client.deployment.ClusterSpecification; -import org.apache.flink.client.deployment.executors.JobClientImpl; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.client.deployment.AbstractJobClusterExecutor; import org.apache.flink.core.execution.Executor; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.yarn.YarnClusterClientFactory; -import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.URL; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** * The {@link Executor} to be used when executing a job in isolation. @@ -48,56 +31,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * tear it down when the job is finished either successfully or due to an error. */ @Internal -public class YarnJobClusterExecutor implements Executor { - - private static final Logger LOG = LoggerFactory.getLogger(YarnJobClusterExecutor.class); +public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> { public static final String NAME = "yarn-job-cluster"; - private final YarnClusterClientFactory clusterClientFactory; - public YarnJobClusterExecutor() { - this.clusterClientFactory = new YarnClusterClientFactory(); - } - - @Override - public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration executionConfig) throws Exception { - - try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) { - final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig); - - final List<URL> dependencies = configAccessor.getJars(); - final List<URL> classpaths = configAccessor.getClasspaths(); - - final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, dependencies); - - final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig); - - final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode()); - LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); - return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID())); - } - } - - private JobGraph getJobGraph( - final Pipeline pipeline, - final Configuration configuration, - final List<URL> classpaths, - final List<URL> libraries) { - - checkNotNull(pipeline); - checkNotNull(configuration); - checkNotNull(classpaths); - checkNotNull(libraries); - - final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); - final JobGraph jobGraph = FlinkPipelineTranslationUtil - .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism()); - - jobGraph.addJars(libraries); - jobGraph.setClasspaths(classpaths); - jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings()); - - return jobGraph; + super(new YarnClusterClientFactory()); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java index de4d148..873dce4 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java @@ -19,68 +19,21 @@ package org.apache.flink.yarn.executors; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.client.ClientUtils; -import org.apache.flink.client.FlinkPipelineTranslationUtil; -import org.apache.flink.client.cli.ExecutionConfigAccessor; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.client.deployment.AbstractSessionClusterExecutor; import org.apache.flink.core.execution.Executor; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.yarn.YarnClusterClientFactory; -import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.hadoop.yarn.api.records.ApplicationId; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; - /** * The {@link Executor} to be used when executing a job on an already running cluster. */ @Internal -public class YarnSessionClusterExecutor implements Executor { +public class YarnSessionClusterExecutor extends AbstractSessionClusterExecutor<ApplicationId, YarnClusterClientFactory> { public static final String NAME = "yarn-session-cluster"; - private final YarnClusterClientFactory clusterClientFactory; - public YarnSessionClusterExecutor() { - this.clusterClientFactory = new YarnClusterClientFactory(); - } - - @Override - public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception { - final JobGraph jobGraph = getJobGraph(pipeline, configuration); - - try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { - final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration); - checkState(clusterID != null); - - // TODO: 17.11.19 we cannot close the client here because we simply have a future of the client - final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID); - return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph); - } - } - - private JobGraph getJobGraph( - final Pipeline pipeline, - final Configuration configuration) { - - checkNotNull(pipeline); - checkNotNull(configuration); - - final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); - final JobGraph jobGraph = FlinkPipelineTranslationUtil - .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism()); - - jobGraph.addJars(executionConfigAccessor.getJars()); - jobGraph.setClasspaths(executionConfigAccessor.getClasspaths()); - jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings()); - - return jobGraph; + super(new YarnClusterClientFactory()); } }