tillrohrmann closed pull request #6717: [FLINK-10369][tests] Enable YARNITCase 
to test per job mode deployment
URL: https://github.com/apache/flink/pull/6717
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 37b8d410a5d..a16cb0b752c 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -22,12 +22,12 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.util.YarnTestUtils;
 
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import java.io.File;
-import java.io.FilenameFilter;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -52,15 +52,15 @@ public TestingYarnClusterDescriptor(
                        sharedYarnClient);
                List<File> filesToShip = new ArrayList<>();
 
-               File testingJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-yarn-tests"));
+               File testingJar = YarnTestBase.findFile("..", new 
YarnTestUtils.TestJarFinder("flink-yarn-tests"));
                Preconditions.checkNotNull(testingJar, "Could not find the 
flink-yarn-tests tests jar. " +
                        "Make sure to package the flink-yarn-tests module.");
 
-               File testingRuntimeJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-runtime"));
+               File testingRuntimeJar = YarnTestBase.findFile("..", new 
YarnTestUtils.TestJarFinder("flink-runtime"));
                Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-runtime tests " +
                        "jar. Make sure to package the flink-runtime module.");
 
-               File testingYarnJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-yarn"));
+               File testingYarnJar = YarnTestBase.findFile("..", new 
YarnTestUtils.TestJarFinder("flink-yarn"));
                Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-yarn tests " +
                        "jar. Make sure to package the flink-yarn module.");
 
@@ -89,18 +89,4 @@ public YarnClusterClient deployJobCluster(
                throw new UnsupportedOperationException("Cannot deploy a 
per-job cluster yet.");
        }
 
-       static class TestJarFinder implements FilenameFilter {
-
-               private final String jarName;
-
-               TestJarFinder(final String jarName) {
-                       this.jarName = jarName;
-               }
-
-               @Override
-               public boolean accept(File dir, String name) {
-                       return name.startsWith(jarName) && 
name.endsWith("-tests.jar") &&
-                               dir.getAbsolutePath().contains(dir.separator + 
jarName + dir.separator);
-               }
-       }
 }
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 9a8f5033f3f..3625a90076f 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -60,7 +60,7 @@
 /**
  * Tests that verify correct HA behavior.
  */
-public class YARNHighAvailabilityITCase extends YarnTestBase {
+       public class YARNHighAvailabilityITCase extends YarnTestBase {
 
        private static TestingServer zkServer;
 
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 758a09866d0..814e8082f70 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -20,24 +20,31 @@
 
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.yarn.util.YarnTestUtils;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
 import java.util.Arrays;
-import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
 
 /**
  * Test cases for the deployment of Yarn Flink clusters.
@@ -50,7 +57,6 @@ public static void setup() {
                startYARNWithConfig(YARN_CONFIGURATION);
        }
 
-       @Ignore("The cluster cannot be stopped yet.")
        @Test
        public void testPerJobMode() throws Exception {
                Configuration configuration = new Configuration();
@@ -77,50 +83,55 @@ public void testPerJobMode() throws Exception {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(2);
 
-                       env.addSource(new InfiniteSource())
+                       env.addSource(new NoDataSource())
                                .shuffle()
-                               .addSink(new DiscardingSink<Integer>());
+                               .addSink(new DiscardingSink<>());
 
                        final JobGraph jobGraph = 
env.getStreamGraph().getJobGraph();
 
-                       File testingJar = YarnTestBase.findFile("..", new 
TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
+                       File testingJar = YarnTestBase.findFile("..", new 
YarnTestUtils.TestJarFinder("flink-yarn-tests"));
 
                        jobGraph.addJar(new 
org.apache.flink.core.fs.Path(testingJar.toURI()));
 
-                       ClusterClient<ApplicationId> clusterClient = 
yarnClusterDescriptor.deployJobCluster(
-                               clusterSpecification,
-                               jobGraph,
-                               true);
+                       ApplicationId applicationId = null;
+                       ClusterClient<ApplicationId> clusterClient = null;
 
-                       clusterClient.shutdown();
-               }
-       }
+                       try {
+                               clusterClient = 
yarnClusterDescriptor.deployJobCluster(
+                                       clusterSpecification,
+                                       jobGraph,
+                                       false);
+                               applicationId = clusterClient.getClusterId();
 
-       private static class InfiniteSource implements 
ParallelSourceFunction<Integer> {
+                               assertThat(clusterClient, 
is(instanceOf(RestClusterClient.class)));
+                               final RestClusterClient<ApplicationId> 
restClusterClient = (RestClusterClient<ApplicationId>) clusterClient;
 
-               private static final long serialVersionUID = 
1642561062000662861L;
-               private volatile boolean running;
-               private final Random random;
+                               final CompletableFuture<JobResult> 
jobResultCompletableFuture = 
restClusterClient.requestJobResult(jobGraph.getJobID());
 
-               InfiniteSource() {
-                       running = true;
-                       random = new Random();
-               }
+                               final JobResult jobResult = 
jobResultCompletableFuture.get();
 
-               @Override
-               public void run(SourceContext<Integer> ctx) throws Exception {
-                       while (running) {
-                               synchronized (ctx.getCheckpointLock()) {
-                                       ctx.collect(random.nextInt());
+                               assertThat(jobResult, is(notNullValue()));
+                               
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+                       } finally {
+                               if (clusterClient != null) {
+                                       clusterClient.shutdown();
                                }
 
-                               Thread.sleep(5L);
+                               if (applicationId != null) {
+                                       
yarnClusterDescriptor.killCluster(applicationId);
+                               }
                        }
                }
+       }
+
+       private static class NoDataSource implements 
ParallelSourceFunction<Integer> {
+
+               private static final long serialVersionUID = 
1642561062000662861L;
 
                @Override
-               public void cancel() {
-                       running = false;
-               }
+               public void run(SourceContext<Integer> ctx) {}
+
+               @Override
+               public void cancel() {}
        }
 }
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java
index 25d833b570f..f7a9634b872 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
 
 /**
  * Utility methods for YARN tests.
@@ -33,4 +34,22 @@ public static File getTestJarPath(String fileName) throws 
FileNotFoundException
                }
                return f;
        }
+
+       /**
+        * Filename filter which finds the test jar for the given name.
+        */
+       public static class TestJarFinder implements FilenameFilter {
+
+               private final String jarName;
+
+               public TestJarFinder(final String jarName) {
+                       this.jarName = jarName;
+               }
+
+               @Override
+               public boolean accept(File dir, String name) {
+                       return name.startsWith(jarName) && 
name.endsWith("-tests.jar") &&
+                               dir.getAbsolutePath().contains(File.separator + 
jarName + File.separator);
+               }
+       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to