This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors-clean in repository https://gitbox.apache.org/repos/asf/flink.git
commit f960c8b76a16cc00208e05f1cb6235305dbc9d25 Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Mon Nov 18 15:22:19 2019 +0100 [hotfix] Simplify ContextEnvironment construction to use configuration --- .../java/org/apache/flink/client/ClientUtils.java | 25 ++------- .../flink/client/program/ContextEnvironment.java | 41 ++++++++------ .../client/program/ContextEnvironmentFactory.java | 64 +++++++--------------- .../apache/flink/client/program/ClientTest.java | 5 ++ 4 files changed, 57 insertions(+), 78 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index 5e53bc3..1654dff 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -20,7 +20,6 @@ package org.apache.flink.client; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobSubmissionResult; -import org.apache.flink.client.cli.ExecutionConfigAccessor; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ContextEnvironment; import org.apache.flink.client.program.ContextEnvironmentFactory; @@ -30,10 +29,10 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.ProgramMissingJobException; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.ExceptionUtils; @@ -150,33 +149,21 @@ public enum ClientUtils { ClusterClient<?> client, PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException { - final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); - - final List<URL> jobJars = executionConfigAccessor.getJars(); - final List<URL> classpaths = executionConfigAccessor.getClasspaths(); - final SavepointRestoreSettings savepointSettings = executionConfigAccessor.getSavepointRestoreSettings(); - final int parallelism = executionConfigAccessor.getParallelism(); - final boolean detached = executionConfigAccessor.getDetachedMode(); - final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader(); final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(userCodeClassLoader); - LOG.info("Starting program (detached: {})", detached); + LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED)); final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>(); ContextEnvironmentFactory factory = new ContextEnvironmentFactory( - client, - jobJars, - classpaths, - userCodeClassLoader, - parallelism, - detached, - savepointSettings, - jobExecutionResult); + configuration, + client, + userCodeClassLoader, + jobExecutionResult); ContextEnvironment.setAsContext(factory); try { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index 08a02af..9a03271 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -25,6 +25,8 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.FlinkPipelineTranslationUtil; +import org.apache.flink.client.cli.ExecutionConfigAccessor; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -32,6 +34,8 @@ import java.net.URL; import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Execution Environment for remote execution with the Client. */ @@ -54,23 +58,28 @@ public class ContextEnvironment extends ExecutionEnvironment { private boolean alreadyCalled; public ContextEnvironment( - ClusterClient<?> remoteConnection, - List<URL> jarFiles, - List<URL> classpaths, - ClassLoader userCodeClassLoader, - SavepointRestoreSettings savepointSettings, - boolean detached, - AtomicReference<JobExecutionResult> jobExecutionResult) { - this.client = remoteConnection; - this.jarFilesToAttach = jarFiles; - this.classpathsToAttach = classpaths; - this.userCodeClassLoader = userCodeClassLoader; - this.savepointSettings = savepointSettings; - - this.detached = detached; - this.alreadyCalled = false; + final Configuration configuration, + final ClusterClient<?> remoteConnection, + final ClassLoader userCodeClassLoader, + final AtomicReference<JobExecutionResult> jobExecutionResult) { + + this.client = checkNotNull(remoteConnection); + this.userCodeClassLoader = checkNotNull(userCodeClassLoader); + this.jobExecutionResult = checkNotNull(jobExecutionResult); + + final ExecutionConfigAccessor accessor = ExecutionConfigAccessor + .fromConfiguration(checkNotNull(configuration)); - this.jobExecutionResult = jobExecutionResult; + if (accessor.getParallelism() > 0) { + setParallelism(accessor.getParallelism()); + } + + this.jarFilesToAttach = accessor.getJars(); + this.classpathsToAttach = accessor.getClasspaths(); + this.savepointSettings = accessor.getSavepointRestoreSettings(); + this.detached = accessor.getDetachedMode(); + + this.alreadyCalled = false; } @Override diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java index ff7f15b..ab589f2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java @@ -22,12 +22,13 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; -import java.net.URL; -import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * The factory that instantiates the environment to be used when running jobs that are * submitted through a pre-configured client connection. @@ -35,64 +36,41 @@ import java.util.concurrent.atomic.AtomicReference; */ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory { - private final ClusterClient<?> client; - - private final List<URL> jarFilesToAttach; + private final Configuration configuration; - private final List<URL> classpathsToAttach; + private final ClusterClient<?> client; private final ClassLoader userCodeClassLoader; - private final int defaultParallelism; - - private final boolean isDetached; - - private final SavepointRestoreSettings savepointSettings; - private final AtomicReference<JobExecutionResult> jobExecutionResult; private boolean alreadyCalled; public ContextEnvironmentFactory( - ClusterClient<?> client, - List<URL> jarFilesToAttach, - List<URL> classpathsToAttach, - ClassLoader userCodeClassLoader, - int defaultParallelism, - boolean isDetached, - SavepointRestoreSettings savepointSettings, - AtomicReference<JobExecutionResult> jobExecutionResult) { - this.client = client; - this.jarFilesToAttach = jarFilesToAttach; - this.classpathsToAttach = classpathsToAttach; - this.userCodeClassLoader = userCodeClassLoader; - this.defaultParallelism = defaultParallelism; - this.isDetached = isDetached; - this.savepointSettings = savepointSettings; + final Configuration configuration, + final ClusterClient<?> client, + final ClassLoader userCodeClassLoader, + final AtomicReference<JobExecutionResult> jobExecutionResult) { + this.configuration = checkNotNull(configuration); + this.client = checkNotNull(client); + this.userCodeClassLoader = checkNotNull(userCodeClassLoader); + this.jobExecutionResult = checkNotNull(jobExecutionResult); + this.alreadyCalled = false; - this.jobExecutionResult = jobExecutionResult; } @Override public ExecutionEnvironment createExecutionEnvironment() { verifyCreateIsCalledOnceWhenInDetachedMode(); - - final ContextEnvironment environment = new ContextEnvironment( - client, - jarFilesToAttach, - classpathsToAttach, - userCodeClassLoader, - savepointSettings, - isDetached, - jobExecutionResult); - if (defaultParallelism > 0) { - environment.setParallelism(defaultParallelism); - } - return environment; + return new ContextEnvironment( + configuration, + client, + userCodeClassLoader, + jobExecutionResult); } private void verifyCreateIsCalledOnceWhenInDetachedMode() { - if (isDetached && alreadyCalled) { + if (!configuration.getBoolean(DeploymentOptions.ATTACHED) && alreadyCalled) { throw new InvalidProgramException("Multiple environments cannot be created in detached mode"); } alreadyCalled = true; diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 5845080..6e53abb 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -64,6 +64,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Simple and maybe stupid test to check the {@link ClusterClient} class. @@ -193,6 +194,10 @@ public class ClientTest extends TestLogger { @Test public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException { PackagedProgram packagedProgramMock = mock(PackagedProgram.class); + + when(packagedProgramMock.getUserCodeClassLoader()) + .thenReturn(packagedProgramMock.getClass().getClassLoader()); + doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable {