This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new ddff5f8 [ZEPPELIN-5560] spark yarn app end with failed status in yarn-cluster mode ddff5f8 is described below commit ddff5f8c2ef4758928d2f0eb494b474c43de1a44 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Oct 13 14:37:57 2021 +0800 [ZEPPELIN-5560] spark yarn app end with failed status in yarn-cluster mode ### What is this PR for? The root cause is that `RemoteInterpreterServer` would call System.exit to forceShutdown spark driver in yarn-cluster mode. This PR would disable forceShutdown in spark's yarn-cluster mode. ### What type of PR is it? [ Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5560 ### How should this be tested? * CI ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #4255 from zjffdu/ZEPPELIN-5560 and squashes the following commits: 5b4aceeb95 [Jeff Zhang] [ZEPPELIN-5560] spark yarn app end with failed status in yarn-cluster mode --- zeppelin-interpreter-integration/README.md | 6 ++++-- .../zeppelin/integration/SparkIntegrationTest.java | 17 ++++++++++++++++- .../interpreter/launcher/SparkInterpreterLauncher.java | 11 ++++++----- .../launcher/SparkInterpreterLauncherTest.java | 2 +- 4 files changed, 27 insertions(+), 9 deletions(-) diff --git a/zeppelin-interpreter-integration/README.md b/zeppelin-interpreter-integration/README.md index bf235ff..6f2cec9 100644 --- a/zeppelin-interpreter-integration/README.md +++ b/zeppelin-interpreter-integration/README.md @@ -1,4 +1,6 @@ ## How to run Zeppelin integration tests -If you have hadoop installed on your machine, please make sure to unset hadoop related enviromnets: -* HADOOP_CONF_DIR \ No newline at end of file +If you have hadoop installed on your machine, please make sure to unset hadoop related environments: +* HADOOP_CONF_DIR + +If you want to run integration tests in IDE, please set `ZEPPELIN_HOME` manually. diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java index 1dec6ee..2c4c278 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java @@ -20,6 +20,8 @@ package org.apache.zeppelin.integration; import org.apache.commons.io.IOUtils; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.maven.model.Model; @@ -46,6 +48,8 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.EnumSet; +import java.util.List; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -249,6 +253,7 @@ public abstract class SparkIntegrationTest { // parameters with whitespace sparkInterpreterSetting.setProperty("spark.app.name", "hello spark"); + String yarnAppId = null; try { setUpSparkInterpreterSetting(sparkInterpreterSetting); testInterpreterBasics(); @@ -258,10 +263,20 @@ public abstract class SparkIntegrationTest { GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request); assertEquals(1, response.getApplicationList().size()); assertEquals("hello spark", response.getApplicationList().get(0).getName()); - + yarnAppId = response.getApplicationList().get(0).getApplicationId().toString(); } finally { interpreterSettingManager.close(); waitForYarnAppCompleted(30 * 1000); + + if (yarnAppId != null) { + // ensure yarn app is finished with SUCCEEDED status. + final String finalYarnAppId = yarnAppId; + GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.FINISHED)); + GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request); + List<ApplicationReport> apps = response.getApplicationList().stream().filter(app -> app.getApplicationId().toString().equals(finalYarnAppId)).collect(Collectors.toList()); + assertEquals(1, apps.size()); + assertEquals(FinalApplicationStatus.SUCCEEDED, apps.get(0).getFinalApplicationStatus()); + } } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 50bdc88..8f8eedc 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -98,9 +98,14 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { sparkProperties.setProperty("spark.pyspark.python", condaEnvName + "/bin/python"); } - if (isYarnMode() && getDeployMode().equals("cluster")) { + if (isYarnCluster()) { env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true"); sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", "false"); + // Need to set `zeppelin.interpreter.forceShutdown` in interpreter properties directly + // instead of updating sparkProperties. + // Because `zeppelin.interpreter.forceShutdown` is initialized in RemoteInterpreterServer + // before SparkInterpreter is created. + context.getProperties().put("zeppelin.interpreter.forceShutdown", "false"); } else if (zConf.isOnlyYarnCluster()){ throw new IOException("Only yarn-cluster mode is allowed, please set " + ZeppelinConfiguration.ConfVars.ZEPPELIN_SPARK_ONLY_YARN_CLUSTER.getVarName() + @@ -422,8 +427,4 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { private boolean isYarnCluster() { return isYarnMode() && "cluster".equalsIgnoreCase(getDeployMode()); } - - private boolean isYarnClient() { - return isYarnMode() && "client".equalsIgnoreCase(getDeployMode()); - } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index f99bb21..b1d13da 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -52,7 +52,7 @@ public class SparkInterpreterLauncherTest { System.clearProperty(confVar.getVarName()); } - sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7"); + sparkHome = DownloadUtils.downloadSpark("2.4.7", "2.7"); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("..").getAbsolutePath());