This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 420f6cac29bf795f524a40bca60adb46edfa6e3a
Author: shuai-xu <shua...@foxmail.com>
AuthorDate: Fri Jul 19 12:36:45 2019 +0800

    [FLINK-12038][tests] Harden YARNITCase
    
    Only kill Yarn application if it does not properly terminate.
    
    This closes #9175.
---
 .../java/org/apache/flink/yarn/YARNITCase.java     | 37 +++++++++++++++++++---
 1 file changed, 33 insertions(+), 4 deletions(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 470c04d..81a299b 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
@@ -32,11 +33,14 @@ import org.apache.flink.yarn.util.YarnTestUtils;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 
@@ -50,6 +54,10 @@ import static org.junit.Assert.assertThat;
  */
 public class YARNITCase extends YarnTestBase {
 
+       private final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
+
+       private final int sleepIntervalInMS = 100;
+
        @BeforeClass
        public static void setup() {
                YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-per-job");
@@ -113,16 +121,37 @@ public class YARNITCase extends YarnTestBase {
 
                                        assertThat(jobResult, 
is(notNullValue()));
                                        
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+
+                                       
waitApplicationFinishedElseKillIt(applicationId, yarnAppTerminateTimeout, 
yarnClusterDescriptor);
                                } finally {
                                        if (clusterClient != null) {
                                                clusterClient.shutdown();
                                        }
-
-                                       if (applicationId != null) {
-                                               
yarnClusterDescriptor.killCluster(applicationId);
-                                       }
                                }
                        }
                });
        }
+
+       private void waitApplicationFinishedElseKillIt(
+                       ApplicationId applicationId,
+                       Duration timeout,
+                       YarnClusterDescriptor yarnClusterDescriptor) throws 
Exception {
+               Deadline deadline = Deadline.now().plus(timeout);
+               YarnApplicationState state = 
getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();
+
+               while (state != YarnApplicationState.FINISHED) {
+                       if (state == YarnApplicationState.FAILED || state == 
YarnApplicationState.KILLED) {
+                               Assert.fail("Application became FAILED or 
KILLED while expecting FINISHED");
+                       }
+
+                       if (deadline.isOverdue()) {
+                               
yarnClusterDescriptor.killCluster(applicationId);
+                               Assert.fail("Application didn't finish before 
timeout");
+                       }
+
+                       sleep(sleepIntervalInMS);
+                       state = 
getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();
+               }
+       }
+
 }

Reply via email to