[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-04-21 Thread GitBox


tillrohrmann commented on a change in pull request #11473:
URL: https://github.com/apache/flink/pull/11473#discussion_r412281489



##
File path: 
flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java
##
@@ -41,5 +41,5 @@
 * @param configuration the {@link Configuration} with the required 
execution parameters
 * @return a {@link CompletableFuture} with the {@link JobClient} 
corresponding to the pipeline.
 */
-   CompletableFuture execute(final Pipeline pipeline, final 
Configuration configuration) throws Exception;
+   CompletableFuture execute(final Pipeline pipeline, 
final Configuration configuration) throws Exception;

Review comment:
   I think this change is not necessary and can be reverted. No explicit 
cast should be required if simply using an `thenApply(Function.identity())` 
with the right generics.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-04-02 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r402431325
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PerJobMiniCluster.class);
+
+   private final Configuration configuration;
+   private final Function 
miniClusterFactory;
+
+   public PerJobMiniCluster() {
+   this(new Configuration());
+   }
+
+   public PerJobMiniCluster(Configuration configuration) {
+   this(configuration, MiniCluster::new);
+   }
+
+   public PerJobMiniCluster(Configuration configuration, 
Function miniClusterFactory) {
+   this.configuration = configuration;
+   this.miniClusterFactory = miniClusterFactory;
+   }
+
+   /**
+* Starts a {@link MiniCluster} and submits a job.
+*/
+   public CompletableFuture submitJob(JobGraph jobGraph) throws 
Exception {
+   MiniClusterConfiguration miniClusterConfig = 
getMiniClusterConfig(jobGraph.getMaximumParallelism());
+   MiniCluster miniCluster = 
miniClusterFactory.apply(miniClusterConfig);
+   miniCluster.start();
+
+   return miniCluster
+   .submitJob(jobGraph)
+   .handle((result, throwable) -> {
+   if (throwable != null) {
+   try {
+   throw new 
RuntimeException("Error submitting job to MiniCluster.", throwable);
+   } finally {
+   shutDownCluster(miniCluster);
+   }
+   }
+   return new 
PerJobMiniClusterJobClient(result.getJobID(), miniCluster);
+   });
+   }
+
+   private MiniClusterConfiguration getMiniClusterConfig(int 
maximumParallelism) {
+   Configuration configuration = this.configuration.clone();
 
 Review comment:
   Java's `Cloneable` mechanism has many problems. In a nutshell the problem is 
that it is fragile, dangerous and extralinguistic as it generates objects w/o 
calling the constructor. The only target it should be used on are arrays if I'm 
not mistaken.
   
   To name a few of the problems in detail: The `Cloneable` interface is a 
marker interface. Hence, before you call `clone` you actually need to make sure 
that the type implements `Cloneable`. But you can also call `clone` if it is 
not implemented.
   
   If a super class implements `Cloneable`, then it is hard for a subtype to 
not implement it. The only way is to 

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-04-02 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r402202469
 
 

 ##
 File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterTest.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CancelableInvokable;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link PerJobMiniCluster}.
+ */
+public class PerJobMiniClusterTest {
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private MiniCluster miniCluster;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+   temporaryFolder.create();
+   }
+
+   @Test
+   public void testJobExecution() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobClient jobClient = 
perJobMiniCluster.submitJob(getNoopJobGraph()).get();
+
+   JobExecutionResult jobExecutionResult = 
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   assertThat(jobExecutionResult, is(notNullValue()));
+
+   Map actual = 
jobClient.getAccumulators(getClass().getClassLoader()).get();
+   assertThat(actual, is(notNullValue()));
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClient() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobGraph cancellableJobGraph = getCancellableJobGraph();
+   JobClient jobClient = perJobMiniCluster
+   .submitJob(cancellableJobGraph)
+   .get();
+
+   assertThat(jobClient.getJobID(), 
is(cancellableJobGraph.getJobID()));
+   assertThat(jobClient.getJobStatus().get(), 
is(JobStatus.RUNNING));
+
+   jobClient.cancel().get();
+
+   try {
+   
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   } catch (ExecutionException e) {
+   assertThat(e.getMessage(), containsString("Failed to 
convert JobResult to JobExecutionResult."));
+   }
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClientSavepoint() throws Exception {
+   JobGraph jobGraph = new JobGraph();
+   String savepointPath = 
temporaryFolder.getRoot().getAbsolutePath();
+
+   PerJobMiniCluster perJobMiniCluster = new PerJobMiniCluster(new 
Configuration(), config -> {
+   // Use a mock in this test case to test calling of the 
savepoint methods
+   miniCluster = Mockito.mock(MiniCluster.class);
+
+   CompletableFuture 
jobSubmissionFuture = new CompletableFuture<>();
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-04-02 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r402202323
 
 

 ##
 File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterTest.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CancelableInvokable;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link PerJobMiniCluster}.
+ */
+public class PerJobMiniClusterTest {
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private MiniCluster miniCluster;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+   temporaryFolder.create();
+   }
+
+   @Test
+   public void testJobExecution() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobClient jobClient = 
perJobMiniCluster.submitJob(getNoopJobGraph()).get();
+
+   JobExecutionResult jobExecutionResult = 
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   assertThat(jobExecutionResult, is(notNullValue()));
+
+   Map actual = 
jobClient.getAccumulators(getClass().getClassLoader()).get();
+   assertThat(actual, is(notNullValue()));
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClient() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobGraph cancellableJobGraph = getCancellableJobGraph();
+   JobClient jobClient = perJobMiniCluster
+   .submitJob(cancellableJobGraph)
+   .get();
+
+   assertThat(jobClient.getJobID(), 
is(cancellableJobGraph.getJobID()));
+   assertThat(jobClient.getJobStatus().get(), 
is(JobStatus.RUNNING));
+
+   jobClient.cancel().get();
+
+   try {
+   
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   } catch (ExecutionException e) {
+   assertThat(e.getMessage(), containsString("Failed to 
convert JobResult to JobExecutionResult."));
+   }
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClientSavepoint() throws Exception {
+   JobGraph jobGraph = new JobGraph();
+   String savepointPath = 
temporaryFolder.getRoot().getAbsolutePath();
+
+   PerJobMiniCluster perJobMiniCluster = new PerJobMiniCluster(new 
Configuration(), config -> {
+   // Use a mock in this test case to test calling of the 
savepoint methods
+   miniCluster = Mockito.mock(MiniCluster.class);
+
+   CompletableFuture 
jobSubmissionFuture = new CompletableFuture<>();
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400207088
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PerJobMiniCluster.class);
+
+   private final Configuration configuration;
+   private final Function 
miniClusterFactory;
+
+   public PerJobMiniCluster() {
+   this(new Configuration());
+   }
+
+   public PerJobMiniCluster(Configuration configuration) {
+   this(configuration, MiniCluster::new);
+   }
+
+   public PerJobMiniCluster(Configuration configuration, 
Function miniClusterFactory) {
+   this.configuration = configuration;
+   this.miniClusterFactory = miniClusterFactory;
+   }
+
+   /**
+* Starts a {@link MiniCluster} and submits a job.
+*/
+   public CompletableFuture submitJob(JobGraph jobGraph) throws 
Exception {
+   MiniClusterConfiguration miniClusterConfig = 
getMiniClusterConfig(jobGraph.getMaximumParallelism());
+   MiniCluster miniCluster = 
miniClusterFactory.apply(miniClusterConfig);
+   miniCluster.start();
+
+   return miniCluster
+   .submitJob(jobGraph)
+   .handle((result, throwable) -> {
+   if (throwable != null) {
+   try {
+   throw new 
RuntimeException("Error submitting job to MiniCluster.", throwable);
+   } finally {
+   shutDownCluster(miniCluster);
+   }
+   }
+   return new 
PerJobMiniClusterJobClient(result.getJobID(), miniCluster);
+   });
+   }
+
+   private MiniClusterConfiguration getMiniClusterConfig(int 
maximumParallelism) {
+   Configuration configuration = this.configuration.clone();
+
+   if (!configuration.contains(RestOptions.BIND_PORT)) {
+   configuration.setString(RestOptions.BIND_PORT, "0");
+   }
+
+   int numTaskManagers = configuration.getInteger(
+   ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+   ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
+
+   // we have to use the maximum parallelism as a default here, 
otherwise streaming pipelines would not run
+   int numSlotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, maximumParallelism);
+
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400234616
 
 

 ##
 File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterTest.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CancelableInvokable;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link PerJobMiniCluster}.
+ */
+public class PerJobMiniClusterTest {
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private MiniCluster miniCluster;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+   temporaryFolder.create();
+   }
+
+   @Test
+   public void testJobExecution() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobClient jobClient = 
perJobMiniCluster.submitJob(getNoopJobGraph()).get();
+
+   JobExecutionResult jobExecutionResult = 
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   assertThat(jobExecutionResult, is(notNullValue()));
+
+   Map actual = 
jobClient.getAccumulators(getClass().getClassLoader()).get();
+   assertThat(actual, is(notNullValue()));
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClient() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobGraph cancellableJobGraph = getCancellableJobGraph();
+   JobClient jobClient = perJobMiniCluster
+   .submitJob(cancellableJobGraph)
+   .get();
+
+   assertThat(jobClient.getJobID(), 
is(cancellableJobGraph.getJobID()));
+   assertThat(jobClient.getJobStatus().get(), 
is(JobStatus.RUNNING));
+
+   jobClient.cancel().get();
+
+   try {
+   
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   } catch (ExecutionException e) {
+   assertThat(e.getMessage(), containsString("Failed to 
convert JobResult to JobExecutionResult."));
+   }
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClientSavepoint() throws Exception {
+   JobGraph jobGraph = new JobGraph();
+   String savepointPath = 
temporaryFolder.getRoot().getAbsolutePath();
+
+   PerJobMiniCluster perJobMiniCluster = new PerJobMiniCluster(new 
Configuration(), config -> {
+   // Use a mock in this test case to test calling of the 
savepoint methods
+   miniCluster = Mockito.mock(MiniCluster.class);
+
+   CompletableFuture 
jobSubmissionFuture = new CompletableFuture<>();
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400241303
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PerJobMiniCluster.class);
+
+   private final Configuration configuration;
+   private final Function 
miniClusterFactory;
+
+   public PerJobMiniCluster() {
+   this(new Configuration());
+   }
+
+   public PerJobMiniCluster(Configuration configuration) {
+   this(configuration, MiniCluster::new);
+   }
+
+   public PerJobMiniCluster(Configuration configuration, 
Function miniClusterFactory) {
+   this.configuration = configuration;
+   this.miniClusterFactory = miniClusterFactory;
+   }
+
+   /**
+* Starts a {@link MiniCluster} and submits a job.
+*/
+   public CompletableFuture submitJob(JobGraph jobGraph) throws 
Exception {
+   MiniClusterConfiguration miniClusterConfig = 
getMiniClusterConfig(jobGraph.getMaximumParallelism());
+   MiniCluster miniCluster = 
miniClusterFactory.apply(miniClusterConfig);
+   miniCluster.start();
+
+   return miniCluster
+   .submitJob(jobGraph)
+   .handle((result, throwable) -> {
+   if (throwable != null) {
+   try {
+   throw new 
RuntimeException("Error submitting job to MiniCluster.", throwable);
+   } finally {
+   shutDownCluster(miniCluster);
+   }
+   }
+   return new 
PerJobMiniClusterJobClient(result.getJobID(), miniCluster);
+   });
+   }
+
+   private MiniClusterConfiguration getMiniClusterConfig(int 
maximumParallelism) {
+   Configuration configuration = this.configuration.clone();
+
+   if (!configuration.contains(RestOptions.BIND_PORT)) {
+   configuration.setString(RestOptions.BIND_PORT, "0");
+   }
+
+   int numTaskManagers = configuration.getInteger(
+   ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+   ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
+
+   // we have to use the maximum parallelism as a default here, 
otherwise streaming pipelines would not run
+   int numSlotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, maximumParallelism);
+
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400197907
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PerJobMiniCluster.class);
+
+   private final Configuration configuration;
+   private final Function 
miniClusterFactory;
+
+   public PerJobMiniCluster() {
+   this(new Configuration());
+   }
 
 Review comment:
   This constructor is never used. Please remove.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400242253
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PerJobMiniCluster.class);
+
+   private final Configuration configuration;
+   private final Function 
miniClusterFactory;
+
+   public PerJobMiniCluster() {
+   this(new Configuration());
+   }
+
+   public PerJobMiniCluster(Configuration configuration) {
+   this(configuration, MiniCluster::new);
+   }
+
+   public PerJobMiniCluster(Configuration configuration, 
Function miniClusterFactory) {
+   this.configuration = configuration;
+   this.miniClusterFactory = miniClusterFactory;
+   }
+
+   /**
+* Starts a {@link MiniCluster} and submits a job.
+*/
+   public CompletableFuture submitJob(JobGraph jobGraph) throws 
Exception {
+   MiniClusterConfiguration miniClusterConfig = 
getMiniClusterConfig(jobGraph.getMaximumParallelism());
+   MiniCluster miniCluster = 
miniClusterFactory.apply(miniClusterConfig);
+   miniCluster.start();
+
+   return miniCluster
+   .submitJob(jobGraph)
+   .handle((result, throwable) -> {
+   if (throwable != null) {
+   try {
+   throw new 
RuntimeException("Error submitting job to MiniCluster.", throwable);
+   } finally {
+   shutDownCluster(miniCluster);
+   }
+   }
+   return new 
PerJobMiniClusterJobClient(result.getJobID(), miniCluster);
+   });
+   }
+
+   private MiniClusterConfiguration getMiniClusterConfig(int 
maximumParallelism) {
+   Configuration configuration = this.configuration.clone();
+
+   if (!configuration.contains(RestOptions.BIND_PORT)) {
+   configuration.setString(RestOptions.BIND_PORT, "0");
+   }
+
+   int numTaskManagers = configuration.getInteger(
+   ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+   ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
+
+   // we have to use the maximum parallelism as a default here, 
otherwise streaming pipelines would not run
+   int numSlotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, maximumParallelism);
+
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400207333
 
 

 ##
 File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterTest.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CancelableInvokable;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link PerJobMiniCluster}.
+ */
+public class PerJobMiniClusterTest {
 
 Review comment:
   ```suggestion
   public class PerJobMiniClusterTest extends TestLogger {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400229264
 
 

 ##
 File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterTest.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CancelableInvokable;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link PerJobMiniCluster}.
+ */
+public class PerJobMiniClusterTest {
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private MiniCluster miniCluster;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+   temporaryFolder.create();
+   }
+
+   @Test
+   public void testJobExecution() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobClient jobClient = 
perJobMiniCluster.submitJob(getNoopJobGraph()).get();
+
+   JobExecutionResult jobExecutionResult = 
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   assertThat(jobExecutionResult, is(notNullValue()));
+
+   Map actual = 
jobClient.getAccumulators(getClass().getClassLoader()).get();
+   assertThat(actual, is(notNullValue()));
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClient() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobGraph cancellableJobGraph = getCancellableJobGraph();
+   JobClient jobClient = perJobMiniCluster
+   .submitJob(cancellableJobGraph)
+   .get();
+
+   assertThat(jobClient.getJobID(), 
is(cancellableJobGraph.getJobID()));
+   assertThat(jobClient.getJobStatus().get(), 
is(JobStatus.RUNNING));
+
+   jobClient.cancel().get();
+
+   try {
+   
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   } catch (ExecutionException e) {
+   assertThat(e.getMessage(), containsString("Failed to 
convert JobResult to JobExecutionResult."));
+   }
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClientSavepoint() throws Exception {
+   JobGraph jobGraph = new JobGraph();
+   String savepointPath = 
temporaryFolder.getRoot().getAbsolutePath();
+
+   PerJobMiniCluster perJobMiniCluster = new PerJobMiniCluster(new 
Configuration(), config -> {
+   // Use a mock in this test case to test calling of the 
savepoint methods
+   miniCluster = Mockito.mock(MiniCluster.class);
+
+   CompletableFuture 
jobSubmissionFuture = new CompletableFuture<>();
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400230703
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PerJobMiniCluster.class);
+
+   private final Configuration configuration;
+   private final Function 
miniClusterFactory;
+
+   public PerJobMiniCluster() {
+   this(new Configuration());
+   }
+
+   public PerJobMiniCluster(Configuration configuration) {
+   this(configuration, MiniCluster::new);
+   }
+
+   public PerJobMiniCluster(Configuration configuration, 
Function miniClusterFactory) {
+   this.configuration = configuration;
+   this.miniClusterFactory = miniClusterFactory;
+   }
+
+   /**
+* Starts a {@link MiniCluster} and submits a job.
+*/
+   public CompletableFuture submitJob(JobGraph jobGraph) throws 
Exception {
+   MiniClusterConfiguration miniClusterConfig = 
getMiniClusterConfig(jobGraph.getMaximumParallelism());
+   MiniCluster miniCluster = 
miniClusterFactory.apply(miniClusterConfig);
+   miniCluster.start();
+
+   return miniCluster
+   .submitJob(jobGraph)
+   .handle((result, throwable) -> {
+   if (throwable != null) {
+   try {
+   throw new 
RuntimeException("Error submitting job to MiniCluster.", throwable);
+   } finally {
+   shutDownCluster(miniCluster);
+   }
+   }
+   return new 
PerJobMiniClusterJobClient(result.getJobID(), miniCluster);
+   });
+   }
+
+   private MiniClusterConfiguration getMiniClusterConfig(int 
maximumParallelism) {
+   Configuration configuration = this.configuration.clone();
+
+   if (!configuration.contains(RestOptions.BIND_PORT)) {
+   configuration.setString(RestOptions.BIND_PORT, "0");
+   }
+
+   int numTaskManagers = configuration.getInteger(
+   ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+   ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
+
+   // we have to use the maximum parallelism as a default here, 
otherwise streaming pipelines would not run
+   int numSlotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, maximumParallelism);
+
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400200873
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PerJobMiniCluster.class);
+
+   private final Configuration configuration;
+   private final Function 
miniClusterFactory;
+
+   public PerJobMiniCluster() {
+   this(new Configuration());
+   }
+
+   public PerJobMiniCluster(Configuration configuration) {
+   this(configuration, MiniCluster::new);
+   }
+
+   public PerJobMiniCluster(Configuration configuration, 
Function miniClusterFactory) {
+   this.configuration = configuration;
+   this.miniClusterFactory = miniClusterFactory;
+   }
+
+   /**
+* Starts a {@link MiniCluster} and submits a job.
+*/
+   public CompletableFuture submitJob(JobGraph jobGraph) throws 
Exception {
+   MiniClusterConfiguration miniClusterConfig = 
getMiniClusterConfig(jobGraph.getMaximumParallelism());
+   MiniCluster miniCluster = 
miniClusterFactory.apply(miniClusterConfig);
+   miniCluster.start();
+
+   return miniCluster
+   .submitJob(jobGraph)
+   .handle((result, throwable) -> {
+   if (throwable != null) {
+   try {
+   throw new 
RuntimeException("Error submitting job to MiniCluster.", throwable);
+   } finally {
+   shutDownCluster(miniCluster);
+   }
+   }
+   return new 
PerJobMiniClusterJobClient(result.getJobID(), miniCluster);
+   });
+   }
+
+   private MiniClusterConfiguration getMiniClusterConfig(int 
maximumParallelism) {
+   Configuration configuration = this.configuration.clone();
 
 Review comment:
   I know that I made `Configuration` `Cloneable`, but now I have to admit that 
this was a failure. I would suggest to rather use the copy constructor of 
`Configuration` in order to not spread this pattern.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400221275
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PerJobMiniCluster.class);
+
+   private final Configuration configuration;
+   private final Function 
miniClusterFactory;
+
+   public PerJobMiniCluster() {
+   this(new Configuration());
+   }
+
+   public PerJobMiniCluster(Configuration configuration) {
+   this(configuration, MiniCluster::new);
+   }
+
+   public PerJobMiniCluster(Configuration configuration, 
Function miniClusterFactory) {
+   this.configuration = configuration;
+   this.miniClusterFactory = miniClusterFactory;
+   }
+
+   /**
+* Starts a {@link MiniCluster} and submits a job.
+*/
+   public CompletableFuture submitJob(JobGraph jobGraph) throws 
Exception {
+   MiniClusterConfiguration miniClusterConfig = 
getMiniClusterConfig(jobGraph.getMaximumParallelism());
+   MiniCluster miniCluster = 
miniClusterFactory.apply(miniClusterConfig);
+   miniCluster.start();
+
+   return miniCluster
+   .submitJob(jobGraph)
+   .handle((result, throwable) -> {
+   if (throwable != null) {
+   try {
+   throw new 
RuntimeException("Error submitting job to MiniCluster.", throwable);
+   } finally {
+   shutDownCluster(miniCluster);
+   }
+   }
+   return new 
PerJobMiniClusterJobClient(result.getJobID(), miniCluster);
+   });
+   }
+
+   private MiniClusterConfiguration getMiniClusterConfig(int 
maximumParallelism) {
+   Configuration configuration = this.configuration.clone();
+
+   if (!configuration.contains(RestOptions.BIND_PORT)) {
+   configuration.setString(RestOptions.BIND_PORT, "0");
+   }
+
+   int numTaskManagers = configuration.getInteger(
+   ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+   ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
+
+   // we have to use the maximum parallelism as a default here, 
otherwise streaming pipelines would not run
+   int numSlotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, maximumParallelism);
+
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400228990
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PerJobMiniCluster.class);
+
+   private final Configuration configuration;
+   private final Function 
miniClusterFactory;
+
+   public PerJobMiniCluster() {
+   this(new Configuration());
+   }
+
+   public PerJobMiniCluster(Configuration configuration) {
+   this(configuration, MiniCluster::new);
+   }
+
+   public PerJobMiniCluster(Configuration configuration, 
Function miniClusterFactory) {
+   this.configuration = configuration;
+   this.miniClusterFactory = miniClusterFactory;
+   }
+
+   /**
+* Starts a {@link MiniCluster} and submits a job.
+*/
+   public CompletableFuture submitJob(JobGraph jobGraph) throws 
Exception {
+   MiniClusterConfiguration miniClusterConfig = 
getMiniClusterConfig(jobGraph.getMaximumParallelism());
+   MiniCluster miniCluster = 
miniClusterFactory.apply(miniClusterConfig);
+   miniCluster.start();
+
+   return miniCluster
+   .submitJob(jobGraph)
+   .handle((result, throwable) -> {
+   if (throwable != null) {
+   try {
+   throw new 
RuntimeException("Error submitting job to MiniCluster.", throwable);
+   } finally {
+   shutDownCluster(miniCluster);
+   }
+   }
+   return new 
PerJobMiniClusterJobClient(result.getJobID(), miniCluster);
+   });
+   }
+
+   private MiniClusterConfiguration getMiniClusterConfig(int 
maximumParallelism) {
+   Configuration configuration = this.configuration.clone();
+
+   if (!configuration.contains(RestOptions.BIND_PORT)) {
+   configuration.setString(RestOptions.BIND_PORT, "0");
+   }
+
+   int numTaskManagers = configuration.getInteger(
+   ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+   ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
+
+   // we have to use the maximum parallelism as a default here, 
otherwise streaming pipelines would not run
+   int numSlotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, maximumParallelism);
+
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400198502
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PerJobMiniCluster.class);
+
+   private final Configuration configuration;
+   private final Function 
miniClusterFactory;
+
+   public PerJobMiniCluster() {
+   this(new Configuration());
+   }
+
+   public PerJobMiniCluster(Configuration configuration) {
+   this(configuration, MiniCluster::new);
+   }
+
+   public PerJobMiniCluster(Configuration configuration, 
Function miniClusterFactory) {
 
 Review comment:
   ```suggestion
public PerJobMiniCluster(Configuration configuration, Function miniClusterFactory) {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400221015
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PerJobMiniCluster.class);
+
+   private final Configuration configuration;
+   private final Function 
miniClusterFactory;
+
+   public PerJobMiniCluster() {
+   this(new Configuration());
+   }
+
+   public PerJobMiniCluster(Configuration configuration) {
+   this(configuration, MiniCluster::new);
+   }
+
+   public PerJobMiniCluster(Configuration configuration, 
Function miniClusterFactory) {
+   this.configuration = configuration;
+   this.miniClusterFactory = miniClusterFactory;
+   }
+
+   /**
+* Starts a {@link MiniCluster} and submits a job.
+*/
+   public CompletableFuture submitJob(JobGraph jobGraph) throws 
Exception {
+   MiniClusterConfiguration miniClusterConfig = 
getMiniClusterConfig(jobGraph.getMaximumParallelism());
+   MiniCluster miniCluster = 
miniClusterFactory.apply(miniClusterConfig);
+   miniCluster.start();
+
+   return miniCluster
+   .submitJob(jobGraph)
+   .handle((result, throwable) -> {
+   if (throwable != null) {
+   try {
+   throw new 
RuntimeException("Error submitting job to MiniCluster.", throwable);
+   } finally {
+   shutDownCluster(miniCluster);
+   }
+   }
+   return new 
PerJobMiniClusterJobClient(result.getJobID(), miniCluster);
+   });
 
 Review comment:
   I think it is a good practice to separate the happy path and error handling:
   ```
   return miniCluster
.submitJob(jobGraph)
.thenApply(jobSubmissionResult -> (JobClient) new 
PerJobMiniClusterJobClient(jobSubmissionResult.getJobID(), miniCluster))
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
shutDownCluster(miniCluster);
}
});
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400195030
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PerJobMiniCluster.class);
+
+   private final Configuration configuration;
+   private final Function 
miniClusterFactory;
+
+   public PerJobMiniCluster() {
+   this(new Configuration());
+   }
+
+   public PerJobMiniCluster(Configuration configuration) {
+   this(configuration, MiniCluster::new);
+   }
+
+   public PerJobMiniCluster(Configuration configuration, 
Function miniClusterFactory) {
+   this.configuration = configuration;
+   this.miniClusterFactory = miniClusterFactory;
+   }
 
 Review comment:
   Nit: I think it is better to provide static factory methods for different 
argument sets because it allows to give a descriptive name. Then one would only 
need a single private constructor which takes the full set of arguments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400226586
 
 

 ##
 File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterTest.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CancelableInvokable;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link PerJobMiniCluster}.
+ */
+public class PerJobMiniClusterTest {
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private MiniCluster miniCluster;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+   temporaryFolder.create();
+   }
+
+   @Test
+   public void testJobExecution() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobClient jobClient = 
perJobMiniCluster.submitJob(getNoopJobGraph()).get();
+
+   JobExecutionResult jobExecutionResult = 
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   assertThat(jobExecutionResult, is(notNullValue()));
+
+   Map actual = 
jobClient.getAccumulators(getClass().getClassLoader()).get();
+   assertThat(actual, is(notNullValue()));
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClient() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobGraph cancellableJobGraph = getCancellableJobGraph();
+   JobClient jobClient = perJobMiniCluster
+   .submitJob(cancellableJobGraph)
+   .get();
+
+   assertThat(jobClient.getJobID(), 
is(cancellableJobGraph.getJobID()));
+   assertThat(jobClient.getJobStatus().get(), 
is(JobStatus.RUNNING));
+
+   jobClient.cancel().get();
+
+   try {
+   
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
 
 Review comment:
   Are we expecting a certain exception here or that his method passes?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400225294
 
 

 ##
 File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterTest.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CancelableInvokable;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link PerJobMiniCluster}.
+ */
+public class PerJobMiniClusterTest {
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private MiniCluster miniCluster;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+   temporaryFolder.create();
+   }
+
+   @Test
+   public void testJobExecution() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobClient jobClient = 
perJobMiniCluster.submitJob(getNoopJobGraph()).get();
+
+   JobExecutionResult jobExecutionResult = 
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   assertThat(jobExecutionResult, is(notNullValue()));
+
+   Map actual = 
jobClient.getAccumulators(getClass().getClassLoader()).get();
+   assertThat(actual, is(notNullValue()));
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClient() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobGraph cancellableJobGraph = getCancellableJobGraph();
+   JobClient jobClient = perJobMiniCluster
+   .submitJob(cancellableJobGraph)
+   .get();
+
+   assertThat(jobClient.getJobID(), 
is(cancellableJobGraph.getJobID()));
+   assertThat(jobClient.getJobStatus().get(), 
is(JobStatus.RUNNING));
+
+   jobClient.cancel().get();
+
+   try {
+   
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   } catch (ExecutionException e) {
+   assertThat(e.getMessage(), containsString("Failed to 
convert JobResult to JobExecutionResult."));
+   }
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClientSavepoint() throws Exception {
+   JobGraph jobGraph = new JobGraph();
+   String savepointPath = 
temporaryFolder.getRoot().getAbsolutePath();
+
+   PerJobMiniCluster perJobMiniCluster = new PerJobMiniCluster(new 
Configuration(), config -> {
+   // Use a mock in this test case to test calling of the 
savepoint methods
+   miniCluster = Mockito.mock(MiniCluster.class);
+
+   CompletableFuture 
jobSubmissionFuture = new CompletableFuture<>();
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400207865
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniCluster.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+/**
+ * Starts a {@link MiniCluster} for every submitted job.
+ * This class guarantees to tear down the MiniCluster in case of normal or 
exceptional job completion.
+ * */
+public class PerJobMiniCluster {
 
 Review comment:
   Isn't this rather a `PerJobMiniClusterFactory`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-30 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r400228021
 
 

 ##
 File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterTest.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CancelableInvokable;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link PerJobMiniCluster}.
+ */
+public class PerJobMiniClusterTest {
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private MiniCluster miniCluster;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+   temporaryFolder.create();
+   }
+
+   @Test
+   public void testJobExecution() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobClient jobClient = 
perJobMiniCluster.submitJob(getNoopJobGraph()).get();
+
+   JobExecutionResult jobExecutionResult = 
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   assertThat(jobExecutionResult, is(notNullValue()));
+
+   Map actual = 
jobClient.getAccumulators(getClass().getClassLoader()).get();
+   assertThat(actual, is(notNullValue()));
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClient() throws Exception {
+   PerJobMiniCluster perJobMiniCluster = initializeMiniCluster();
+
+   JobGraph cancellableJobGraph = getCancellableJobGraph();
+   JobClient jobClient = perJobMiniCluster
+   .submitJob(cancellableJobGraph)
+   .get();
+
+   assertThat(jobClient.getJobID(), 
is(cancellableJobGraph.getJobID()));
+   assertThat(jobClient.getJobStatus().get(), 
is(JobStatus.RUNNING));
+
+   jobClient.cancel().get();
+
+   try {
+   
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
+   } catch (ExecutionException e) {
+   assertThat(e.getMessage(), containsString("Failed to 
convert JobResult to JobExecutionResult."));
+   }
+
+   assertThatMiniClusterIsShutdown();
+   }
+
+   @Test
+   public void testJobClientSavepoint() throws Exception {
+   JobGraph jobGraph = new JobGraph();
+   String savepointPath = 
temporaryFolder.getRoot().getAbsolutePath();
+
+   PerJobMiniCluster perJobMiniCluster = new PerJobMiniCluster(new 
Configuration(), config -> {
+   // Use a mock in this test case to test calling of the 
savepoint methods
+   miniCluster = Mockito.mock(MiniCluster.class);
+
+   CompletableFuture 
jobSubmissionFuture = new CompletableFuture<>();
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-25 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r397723636
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterClient.java
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A Client to interact with a {@link MiniCluster} and manage its lifecycle 
for a single job.
+ *
+ * Shuts down the MiniCluster after the JobResult has been received. It 
caches the JobResult to ensure the result can
+ * be retrieved even if the MiniCluster has been shut down.
+ *
+ * We want to do this to:
+ * (a) ensure the MiniCluster is torn down after the job completes (to avoid 
resource leakage)
+ * (b) avoid race condition between the async shutdown code and the job result 
retrieval.
+ */
+public class PerJobMiniClusterClient extends MiniClusterClient {
 
 Review comment:
   I think with this class we are breaking the Liskov substitution principle. A 
`PerJobMiniClusterClient` no longer respects the contract of the 
`MiniClusterClient` as we are only allowed to submit a single job. An indicator 
for this problem are the many check states in the methods. Hence, a 
`PerJobMiniClusterClient` should not be a `MiniClusterClient`. A 
`MiniClusterClient` could be a `PerJobMiniClusterClient`, though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-25 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r397705292
 
 

 ##
 File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterClientTest.java
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link PerJobMiniClusterClient}.
+ */
+public class PerJobMiniClusterClientTest {
 
 Review comment:
   ```suggestion
   public class PerJobMiniClusterClientTest extends TestLogger {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-25 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r397720848
 
 

 ##
 File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterClientTest.java
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link PerJobMiniClusterClient}.
+ */
+public class PerJobMiniClusterClientTest {
+
+   @Test
+   public void testClusterShutdown() {
+   runClusterShutdownTest(false);
+   }
+
+   @Test
+   public void testClusterShutdownWithException() {
+   runClusterShutdownTest(true);
+   }
+
+   @Test
+   public void testRequestResultBeforeSubmit() {
+   PerJobMiniClusterClient client = new 
PerJobMiniClusterClient(new Configuration(), mock(MiniCluster.class));
+   try {
+   client.requestJobResult(JobID.generate());
+   Assert.fail("This should have failed.");
+   } catch (IllegalStateException ignored) {
+   // That's what we want
+   }
+   }
+
+   @Test
+   public void testSubmitTwice() {
+   MiniCluster miniCluster = mock(MiniCluster.class);
+   PerJobMiniClusterClient client = new 
PerJobMiniClusterClient(new Configuration(), miniCluster);
+
+   JobGraph jobGraph = new JobGraph("test");
+   when(miniCluster.submitJob(jobGraph)).thenReturn(new 
CompletableFuture<>());
+   client.submitJob(jobGraph);
+   try {
+   client.submitJob(jobGraph);
+   Assert.fail("This should have failed.");
+   } catch (IllegalStateException ignored) {
+   // That's what we want
+   }
+   }
+
+   @Test
+   public void testRequestResultWithDifferentJobID() {
+   MiniCluster miniCluster = mock(MiniCluster.class);
+   PerJobMiniClusterClient client = new 
PerJobMiniClusterClient(new Configuration(), miniCluster);
+
+   JobGraph jobGraph = new JobGraph("test");
+   when(miniCluster.submitJob(jobGraph)).thenReturn(new 
CompletableFuture<>());
+   client.submitJob(jobGraph);
+   try {
+   client.requestJobResult(JobID.generate());
+   Assert.fail("This should have failed.");
+   } catch (IllegalStateException ignored) {
+   // That's what we want
+   }
+   }
+
+   private void runClusterShutdownTest(boolean withException) {
+   MiniCluster miniCluster = mock(MiniCluster.class);
+   CompletableFuture submissionResult = new 
CompletableFuture<>();
+   CompletableFuture jobResult = new 
CompletableFuture<>();
+
+   JobGraph jobGraph = new JobGraph("test");
+   
when(miniCluster.submitJob(jobGraph)).thenReturn(submissionResult);
+   
when(miniCluster.requestJobResult(jobGraph.getJobID())).thenReturn(jobResult);
+
+   PerJobMiniClusterClient clusterClient = spy(new 
PerJobMiniClusterClient(new Configuration(), miniCluster));
+
+   CompletableFuture jobIdFuture = 
clusterClient.submitJob(jobGraph);
+   // Trigger fetching the JobResult
+   jobIdFuture.complete(jobGraph.getJobID());
+
+   if (withException) {

[GitHub] [flink] tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

2020-03-25 Thread GitBox
tillrohrmann commented on a change in pull request #11473: [FLINK-16705] Ensure 
MiniCluster shutdown does not interfere with JobResult retrieval
URL: https://github.com/apache/flink/pull/11473#discussion_r397720022
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
 ##
 @@ -81,4 +114,35 @@ private Plan getWordCountPlan(File inFile, File outFile, 
int parallelism) {
.writeAsCsv(outFile.getAbsolutePath());
return env.createProgramPlan();
}
+
+   private Plan getRuntimeExceptionPlan() {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.fromElements(1)
+   .map(element -> {
+   if (element == 1) {
+   throw new RuntimeException("oups");
+   }
+   return element;
+   })
+   .output(new DiscardingOutputFormat<>());
+   return env.createProgramPlan();
+   }
+
+   /**
+* A normal {@link LocalExecutor} but with the option to retrieve the 
Minicluster.
+*/
+   private static class InspectableLocalExecutor extends LocalExecutor {
+
+   private MiniCluster miniCluster;
+
+   @Override
+   protected MiniCluster startMiniCluster(JobGraph jobGraph, 
Configuration configuration) throws Exception {
+   return miniCluster = super.startMiniCluster(jobGraph, 
configuration);
+   }
+
+   public boolean isMiniClusterRunning() {
+   Preconditions.checkNotNull(miniCluster, "MiniCluster 
has not been started yet.");
+   return miniCluster.isRunning();
+   }
+   }
 
 Review comment:
   I think the proper way of making the `LocalExecutor` inspectable is not to 
increase the visibility of methods such as `startMiniCluster` but by 
refactoring the `LocalExecutor` to take a `MiniClusterFactory`. That way you 
could control the `MiniCluster` instance which is created.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services