This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new ddff5f8  [ZEPPELIN-5560] spark yarn app end with failed status in 
yarn-cluster mode
ddff5f8 is described below

commit ddff5f8c2ef4758928d2f0eb494b474c43de1a44
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Wed Oct 13 14:37:57 2021 +0800

    [ZEPPELIN-5560] spark yarn app end with failed status in yarn-cluster mode
    
    ### What is this PR for?
    
    The root cause is that `RemoteInterpreterServer` would call System.exit to 
forceShutdown spark driver in yarn-cluster mode.
    This PR would disable forceShutdown in spark's yarn-cluster mode.
    
    ### What type of PR is it?
    [ Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5560
    
    ### How should this be tested?
    * CI
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #4255 from zjffdu/ZEPPELIN-5560 and squashes the following commits:
    
    5b4aceeb95 [Jeff Zhang] [ZEPPELIN-5560] spark yarn app end with failed 
status in yarn-cluster mode
---
 zeppelin-interpreter-integration/README.md              |  6 ++++--
 .../zeppelin/integration/SparkIntegrationTest.java      | 17 ++++++++++++++++-
 .../interpreter/launcher/SparkInterpreterLauncher.java  | 11 ++++++-----
 .../launcher/SparkInterpreterLauncherTest.java          |  2 +-
 4 files changed, 27 insertions(+), 9 deletions(-)

diff --git a/zeppelin-interpreter-integration/README.md 
b/zeppelin-interpreter-integration/README.md
index bf235ff..6f2cec9 100644
--- a/zeppelin-interpreter-integration/README.md
+++ b/zeppelin-interpreter-integration/README.md
@@ -1,4 +1,6 @@
 ## How to run Zeppelin integration tests
 
-If you have hadoop installed on your machine, please make sure to unset hadoop 
related enviromnets:
-* HADOOP_CONF_DIR
\ No newline at end of file
+If you have hadoop installed on your machine, please make sure to unset hadoop 
related environments:
+* HADOOP_CONF_DIR
+
+If you want to run integration tests in IDE, please set `ZEPPELIN_HOME` 
manually.
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index 1dec6ee..2c4c278 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -20,6 +20,8 @@ package org.apache.zeppelin.integration;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.maven.model.Model;
@@ -46,6 +48,8 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.EnumSet;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -249,6 +253,7 @@ public abstract class SparkIntegrationTest {
     // parameters with whitespace
     sparkInterpreterSetting.setProperty("spark.app.name", "hello spark");
 
+    String yarnAppId = null;
     try {
       setUpSparkInterpreterSetting(sparkInterpreterSetting);
       testInterpreterBasics();
@@ -258,10 +263,20 @@ public abstract class SparkIntegrationTest {
       GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
       assertEquals(1, response.getApplicationList().size());
       assertEquals("hello spark", 
response.getApplicationList().get(0).getName());
-
+      yarnAppId = 
response.getApplicationList().get(0).getApplicationId().toString();
     } finally {
       interpreterSettingManager.close();
       waitForYarnAppCompleted(30 * 1000);
+
+      if (yarnAppId != null) {
+        // ensure yarn app is finished with SUCCEEDED status.
+        final String finalYarnAppId = yarnAppId;
+        GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.FINISHED));
+        GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+        List<ApplicationReport> apps = 
response.getApplicationList().stream().filter(app -> 
app.getApplicationId().toString().equals(finalYarnAppId)).collect(Collectors.toList());
+        assertEquals(1, apps.size());
+        assertEquals(FinalApplicationStatus.SUCCEEDED, 
apps.get(0).getFinalApplicationStatus());
+      }
     }
   }
 
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index 50bdc88..8f8eedc 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -98,9 +98,14 @@ public class SparkInterpreterLauncher extends 
StandardInterpreterLauncher {
       sparkProperties.setProperty("spark.pyspark.python", condaEnvName + 
"/bin/python");
     }
 
-    if (isYarnMode() && getDeployMode().equals("cluster")) {
+    if (isYarnCluster()) {
       env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
       sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", 
"false");
+      // Need to set `zeppelin.interpreter.forceShutdown` in interpreter 
properties directly
+      // instead of updating sparkProperties.
+      // Because `zeppelin.interpreter.forceShutdown` is initialized in 
RemoteInterpreterServer
+      // before SparkInterpreter is created.
+      context.getProperties().put("zeppelin.interpreter.forceShutdown", 
"false");
     } else if (zConf.isOnlyYarnCluster()){
       throw new IOException("Only yarn-cluster mode is allowed, please set " +
               
ZeppelinConfiguration.ConfVars.ZEPPELIN_SPARK_ONLY_YARN_CLUSTER.getVarName() +
@@ -422,8 +427,4 @@ public class SparkInterpreterLauncher extends 
StandardInterpreterLauncher {
   private boolean isYarnCluster() {
     return isYarnMode() && "cluster".equalsIgnoreCase(getDeployMode());
   }
-
-  private boolean isYarnClient() {
-    return isYarnMode() && "client".equalsIgnoreCase(getDeployMode());
-  }
 }
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index f99bb21..b1d13da 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -52,7 +52,7 @@ public class SparkInterpreterLauncherTest {
       System.clearProperty(confVar.getVarName());
     }
 
-    sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
+    sparkHome = DownloadUtils.downloadSpark("2.4.7", "2.7");
     
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
             new File("..").getAbsolutePath());
 

Reply via email to