This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f960c8b76a16cc00208e05f1cb6235305dbc9d25
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Mon Nov 18 15:22:19 2019 +0100

    [hotfix] Simplify ContextEnvironment construction to use configuration
---
 .../java/org/apache/flink/client/ClientUtils.java  | 25 ++-------
 .../flink/client/program/ContextEnvironment.java   | 41 ++++++++------
 .../client/program/ContextEnvironmentFactory.java  | 64 +++++++---------------
 .../apache/flink/client/program/ClientTest.java    |  5 ++
 4 files changed, 57 insertions(+), 78 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 5e53bc3..1654dff 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.ContextEnvironmentFactory;
@@ -30,10 +29,10 @@ import 
org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -150,33 +149,21 @@ public enum ClientUtils {
                        ClusterClient<?> client,
                        PackagedProgram program) throws 
ProgramMissingJobException, ProgramInvocationException {
 
-               final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
-
-               final List<URL> jobJars = executionConfigAccessor.getJars();
-               final List<URL> classpaths = 
executionConfigAccessor.getClasspaths();
-               final SavepointRestoreSettings savepointSettings = 
executionConfigAccessor.getSavepointRestoreSettings();
-               final int parallelism = 
executionConfigAccessor.getParallelism();
-               final boolean detached = 
executionConfigAccessor.getDetachedMode();
-
                final ClassLoader userCodeClassLoader = 
program.getUserCodeClassLoader();
 
                final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
                try {
                        
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
 
-                       LOG.info("Starting program (detached: {})", detached);
+                       LOG.info("Starting program (detached: {})", 
!configuration.getBoolean(DeploymentOptions.ATTACHED));
 
                        final AtomicReference<JobExecutionResult> 
jobExecutionResult = new AtomicReference<>();
 
                        ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(
-                               client,
-                               jobJars,
-                               classpaths,
-                               userCodeClassLoader,
-                               parallelism,
-                               detached,
-                               savepointSettings,
-                               jobExecutionResult);
+                                       configuration,
+                                       client,
+                                       userCodeClassLoader,
+                                       jobExecutionResult);
                        ContextEnvironment.setAsContext(factory);
 
                        try {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 08a02af..9a03271 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
@@ -32,6 +34,8 @@ import java.net.URL;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Execution Environment for remote execution with the Client.
  */
@@ -54,23 +58,28 @@ public class ContextEnvironment extends 
ExecutionEnvironment {
        private boolean alreadyCalled;
 
        public ContextEnvironment(
-               ClusterClient<?> remoteConnection,
-               List<URL> jarFiles,
-               List<URL> classpaths,
-               ClassLoader userCodeClassLoader,
-               SavepointRestoreSettings savepointSettings,
-               boolean detached,
-               AtomicReference<JobExecutionResult> jobExecutionResult) {
-               this.client = remoteConnection;
-               this.jarFilesToAttach = jarFiles;
-               this.classpathsToAttach = classpaths;
-               this.userCodeClassLoader = userCodeClassLoader;
-               this.savepointSettings = savepointSettings;
-
-               this.detached = detached;
-               this.alreadyCalled = false;
+                       final Configuration configuration,
+                       final ClusterClient<?> remoteConnection,
+                       final ClassLoader userCodeClassLoader,
+                       final AtomicReference<JobExecutionResult> 
jobExecutionResult) {
+
+               this.client = checkNotNull(remoteConnection);
+               this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+               this.jobExecutionResult = checkNotNull(jobExecutionResult);
+
+               final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
+                               .fromConfiguration(checkNotNull(configuration));
 
-               this.jobExecutionResult = jobExecutionResult;
+               if (accessor.getParallelism() > 0) {
+                       setParallelism(accessor.getParallelism());
+               }
+
+               this.jarFilesToAttach = accessor.getJars();
+               this.classpathsToAttach = accessor.getClasspaths();
+               this.savepointSettings = accessor.getSavepointRestoreSettings();
+               this.detached = accessor.getDetachedMode();
+
+               this.alreadyCalled = false;
        }
 
        @Override
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index ff7f15b..ab589f2 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -22,12 +22,13 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The factory that instantiates the environment to be used when running jobs 
that are
  * submitted through a pre-configured client connection.
@@ -35,64 +36,41 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-       private final ClusterClient<?> client;
-
-       private final List<URL> jarFilesToAttach;
+       private final Configuration configuration;
 
-       private final List<URL> classpathsToAttach;
+       private final ClusterClient<?> client;
 
        private final ClassLoader userCodeClassLoader;
 
-       private final int defaultParallelism;
-
-       private final boolean isDetached;
-
-       private final SavepointRestoreSettings savepointSettings;
-
        private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
        private boolean alreadyCalled;
 
        public ContextEnvironmentFactory(
-               ClusterClient<?> client,
-               List<URL> jarFilesToAttach,
-               List<URL> classpathsToAttach,
-               ClassLoader userCodeClassLoader,
-               int defaultParallelism,
-               boolean isDetached,
-               SavepointRestoreSettings savepointSettings,
-               AtomicReference<JobExecutionResult> jobExecutionResult) {
-               this.client = client;
-               this.jarFilesToAttach = jarFilesToAttach;
-               this.classpathsToAttach = classpathsToAttach;
-               this.userCodeClassLoader = userCodeClassLoader;
-               this.defaultParallelism = defaultParallelism;
-               this.isDetached = isDetached;
-               this.savepointSettings = savepointSettings;
+                       final Configuration configuration,
+                       final ClusterClient<?> client,
+                       final ClassLoader userCodeClassLoader,
+                       final AtomicReference<JobExecutionResult> 
jobExecutionResult) {
+               this.configuration = checkNotNull(configuration);
+               this.client = checkNotNull(client);
+               this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+               this.jobExecutionResult = checkNotNull(jobExecutionResult);
+
                this.alreadyCalled = false;
-               this.jobExecutionResult = jobExecutionResult;
        }
 
        @Override
        public ExecutionEnvironment createExecutionEnvironment() {
                verifyCreateIsCalledOnceWhenInDetachedMode();
-
-               final ContextEnvironment environment = new ContextEnvironment(
-                       client,
-                       jarFilesToAttach,
-                       classpathsToAttach,
-                       userCodeClassLoader,
-                       savepointSettings,
-                       isDetached,
-                       jobExecutionResult);
-               if (defaultParallelism > 0) {
-                       environment.setParallelism(defaultParallelism);
-               }
-               return environment;
+               return new ContextEnvironment(
+                               configuration,
+                               client,
+                               userCodeClassLoader,
+                               jobExecutionResult);
        }
 
        private void verifyCreateIsCalledOnceWhenInDetachedMode() {
-               if (isDetached && alreadyCalled) {
+               if (!configuration.getBoolean(DeploymentOptions.ATTACHED) && 
alreadyCalled) {
                        throw new InvalidProgramException("Multiple 
environments cannot be created in detached mode");
                }
                alreadyCalled = true;
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 5845080..6e53abb 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -64,6 +64,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
@@ -193,6 +194,10 @@ public class ClientTest extends TestLogger {
        @Test
        public void tryLocalExecution() throws ProgramInvocationException, 
ProgramMissingJobException {
                PackagedProgram packagedProgramMock = 
mock(PackagedProgram.class);
+
+               when(packagedProgramMock.getUserCodeClassLoader())
+                               
.thenReturn(packagedProgramMock.getClass().getClassLoader());
+
                doAnswer(new Answer<Void>() {
                        @Override
                        public Void answer(InvocationOnMock invocation) throws 
Throwable {

Reply via email to