This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 420f6cac29bf795f524a40bca60adb46edfa6e3a Author: shuai-xu <shua...@foxmail.com> AuthorDate: Fri Jul 19 12:36:45 2019 +0800 [FLINK-12038][tests] Harden YARNITCase Only kill Yarn application if it does not properly terminate. This closes #9175. --- .../java/org/apache/flink/yarn/YARNITCase.java | 37 +++++++++++++++++++--- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java index 470c04d..81a299b 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.rest.RestClusterClient; @@ -32,11 +33,14 @@ import org.apache.flink.yarn.util.YarnTestUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import java.io.File; +import java.time.Duration; import java.util.Arrays; import java.util.concurrent.CompletableFuture; @@ -50,6 +54,10 @@ import static org.junit.Assert.assertThat; */ public class YARNITCase extends YarnTestBase { + private final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10); + + private final int sleepIntervalInMS = 100; + @BeforeClass public static void setup() { YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job"); @@ -113,16 +121,37 @@ public class YARNITCase extends YarnTestBase { assertThat(jobResult, is(notNullValue())); assertThat(jobResult.getSerializedThrowable().isPresent(), is(false)); + + waitApplicationFinishedElseKillIt(applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor); } finally { if (clusterClient != null) { clusterClient.shutdown(); } - - if (applicationId != null) { - yarnClusterDescriptor.killCluster(applicationId); - } } } }); } + + private void waitApplicationFinishedElseKillIt( + ApplicationId applicationId, + Duration timeout, + YarnClusterDescriptor yarnClusterDescriptor) throws Exception { + Deadline deadline = Deadline.now().plus(timeout); + YarnApplicationState state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState(); + + while (state != YarnApplicationState.FINISHED) { + if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { + Assert.fail("Application became FAILED or KILLED while expecting FINISHED"); + } + + if (deadline.isOverdue()) { + yarnClusterDescriptor.killCluster(applicationId); + Assert.fail("Application didn't finish before timeout"); + } + + sleep(sleepIntervalInMS); + state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState(); + } + } + }